Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix networking memory leaks and file transfer issues #1578

Open
wants to merge 8 commits into
base: nightly
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import eu.cloudnetservice.driver.network.protocol.Packet;
import eu.cloudnetservice.driver.network.protocol.PacketListenerRegistry;
import io.netty5.channel.Channel;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.Promise;
import io.netty5.util.concurrent.PromiseCombiner;
import java.nio.channels.ClosedChannelException;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The default netty based implementation of a network channel.
Expand All @@ -34,6 +38,7 @@
*/
public final class NettyNetworkChannel extends DefaultNetworkChannel implements NetworkChannel {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyNetworkChannel.class);
private final Channel channel;

/**
Expand Down Expand Up @@ -82,21 +87,35 @@ public void sendPacket(@NonNull Packet... packets) {
executor.execute(() -> this.sendPacket(packets));
}
}

private Future<Void> writeAndFlush(@NonNull Packet packet) {
var future = this.channel.writeAndFlush(packet);
future.addListener(f -> {
if (!f.isSuccess()) {
var cause = f.cause();
// ignore ClosedChannelException, when node stops this sometimes happens
if (!(cause instanceof ClosedChannelException)) {
LOGGER.error("Failed to send packet", cause);
}
}
});
return future;
}

/**
* {@inheritDoc}
*/
@Override
public void sendPacket(@NonNull Packet packet) {
this.channel.writeAndFlush(packet);
this.writeAndFlush(packet);
}

/**
* {@inheritDoc}
*/
@Override
public void sendPacketSync(@NonNull Packet packet) {
var future = this.channel.writeAndFlush(packet);
var future = this.writeAndFlush(packet);
if (!future.executor().inEventLoop()) {
// only await the future if we're not currently in the event loop
// as this would deadlock the write operations triggered previously
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ protected void doHandlePacket(@NonNull BasePacket packet) {
if (task != null) {
// complete the waiting task
task.complete(packet);

// we can't add a packet content released check here, because the
// query handler may not be registered to the future.
// Basically, the query response can come before the handler has been registered

// don't post a query response packet to another handler at all
// the packet might be inbound - we might be expected to respond
Expand All @@ -107,6 +111,9 @@ protected void doHandlePacket(@NonNull BasePacket packet) {
// check if any handler can handle the incoming packet
if (this.channel.handler().handlePacketReceive(this.channel, packet)
&& this.channel.packetRegistry().handlePacket(this.channel, packet)) {
if (packet.content().accessible()) {
LOGGER.error("Listeners did not release packet content: {}", packet);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.netty5.buffer.BufferAllocator;
import io.netty5.buffer.BufferUtil;
import io.netty5.buffer.DefaultBufferAllocators;
import io.netty5.buffer.MemoryManager;
import io.netty5.buffer.bytebuffer.ByteBufferMemoryManager;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelFactory;
import io.netty5.channel.EventLoopGroup;
Expand Down Expand Up @@ -80,7 +82,8 @@ public final class NettyUtil {
if ("netty-default".equals(preferredBufferAllocator) || NettyNioBufferReleasingAllocator.notAbleToFreeBuffers()) {
SELECTED_BUFFER_ALLOCATOR = DefaultBufferAllocators.offHeapAllocator();
} else {
SELECTED_BUFFER_ALLOCATOR = new NettyNioBufferReleasingAllocator();
System.setProperty("io.netty5.buffer.MemoryManager", ByteBufferMemoryManager.class.getName());
SELECTED_BUFFER_ALLOCATOR = new NettyNioBufferReleasingAllocator(((ByteBufferMemoryManager) MemoryManager.instance()));
}

// select the transport type to use for netty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,9 @@ public void close() {

return result;
}

@Override
public String toString() {
return "%s[readableBytes=%d]".formatted(this.getClass().getSimpleName(), this.buffer.readableBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public final class NettyNioBufferReleasingAllocator implements BufferAllocator,
/**
* Constructs a new buffer allocator instance. All instances are backed by a byte buffer memory manager.
*/
public NettyNioBufferReleasingAllocator() {
this.manager = new ByteBufferMemoryManager();
public NettyNioBufferReleasingAllocator(@NonNull ByteBufferMemoryManager manager) {
this.manager = manager;
}

/**
* Get if this allocator can free direct buffers. If this method returns false, this allocator shouldn't be used as it
* Get if this allocator can free direct buffers. If this method returns true, this allocator shouldn't be used as it
* does nothing. Use the default netty allocator instead.
*
* @return if this allocator is able to free direct nio buffers.
Expand Down Expand Up @@ -170,7 +170,7 @@ public void drop(@NonNull Buffer obj) {
try {
DIRECT_BUFFER_CLEANER.invokeExact(recoverableMemory);
} catch (Throwable exception) {
LOGGER.debug("Unable to free direct ByteBuf using Unsafe.invokeCleaner: {}", exception.getMessage());
LOGGER.error("Unable to free direct ByteBuf using Unsafe.invokeCleaner: {}", exception.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.ByteToMessageDecoder;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class VarInt32FrameDecoder extends ByteToMessageDecoder {

private static final Logger LOGGER = LoggerFactory.getLogger(VarInt32FrameDecoder.class);

/**
* {@inheritDoc}
*/
Expand All @@ -42,12 +46,16 @@ protected void decode(@NonNull ChannelHandlerContext ctx, @NonNull Buffer in) {
return;
}

// skip empty packets silently
if (length <= 0) {
// check if there are bytes to skip
if (in.readableBytes() > 0) {
in.skipReadableBytes(in.readableBytes());
}
if (length == 0) {
// empty packet length should not be possible. Someone didn't follow the protocol. (Should we maybe even disconnect?)
LOGGER.error("Skipped incoming packet with length 0");
return;
} else if (length < 0) {
// negative packet length is not a good omen... Someone didn't follow the protocol. (Should we maybe even disconnect?)
LOGGER.error("Incoming packet had negative length {} - readableBytes: {}", length, in.readableBytes());
// try to only skip 1 byte. That way we should at some point arrive at a valid state again
// should never happen to begin with
in.readerOffset(readerIndex + 1);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import io.netty5.util.concurrent.Promise;
import io.netty5.util.concurrent.PromiseCombiner;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class VarInt32FramePrepender extends ChannelHandlerAdapter {

public static final VarInt32FramePrepender INSTANCE = new VarInt32FramePrepender();
private static final Logger LOGGER = LoggerFactory.getLogger(VarInt32FramePrepender.class);

/**
* {@inheritDoc}
Expand All @@ -37,6 +40,13 @@ public final class VarInt32FramePrepender extends ChannelHandlerAdapter {
if (msg instanceof Buffer packetDataBuffer) {
// first write the buffer that contains the length of the following buffer
var length = packetDataBuffer.readableBytes();
if (length == 0) {
// empty packet should not happen. This indicates a bug in the NettyPacketEncoder,
// which should rather not submit a Buffer than submit an empty one.
// Let's also log a stack trace, may make things easier to debug if they break.
LOGGER.error("Skip packet with length 0", new Exception("Thread dump"));
return ctx.newFailedFuture(new IllegalArgumentException("Send buffer with readableBytes=0"));
}
var encodedLengthFieldLength = NettyUtil.varIntBytes(length);
var lengthBuffer = ctx.bufferAllocator().allocate(encodedLengthFieldLength);
NettyUtil.writeVarInt(lengthBuffer, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,33 @@ record RPCResultMapper<T>(
@Override
public @UnknownNullability T apply(@UnknownNullability Packet response) {
var responseData = response.content();
var status = responseData.readByte();
return switch (status) {
case RPCInvocationResult.STATUS_OK -> this.objectMapper.readObject(responseData, this.expectedResultType);
case RPCInvocationResult.STATUS_ERROR -> {
RPCExceptionUtil.rethrowHandlingException(responseData);
yield null; // never reached, but must be there for the compiler to be happy
}
case RPCInvocationResult.STATUS_BAD_REQUEST -> {
var detailMessage = responseData.readString();
var exceptionMessage = String.format("RPC couldn't be processed due to bad input data: %s", detailMessage);
throw new RPCExecutionException(exceptionMessage);
}
case RPCInvocationResult.STATUS_SERVER_ERROR -> {
var detailMessage = responseData.readString();
var exceptionMessage = String.format("RPC couldn't be processed due to a server error: %s", detailMessage);
throw new RPCExecutionException(exceptionMessage);
}
default -> {
var exceptionMessage = String.format("Server responded with unknown status code: %d", status);
throw new RPCExecutionException(exceptionMessage);
}
};
try {
var status = responseData.readByte();
return switch (status) {
case RPCInvocationResult.STATUS_OK -> this.objectMapper.readObject(responseData, this.expectedResultType);
case RPCInvocationResult.STATUS_ERROR -> {
RPCExceptionUtil.rethrowHandlingException(responseData);
yield null; // never reached, but must be there for the compiler to be happy
}
case RPCInvocationResult.STATUS_BAD_REQUEST -> {
var detailMessage = responseData.readString();
var exceptionMessage = String.format("RPC couldn't be processed due to bad input data: %s", detailMessage);
throw new RPCExecutionException(exceptionMessage);
}
case RPCInvocationResult.STATUS_SERVER_ERROR -> {
var detailMessage = responseData.readString();
var exceptionMessage = String.format("RPC couldn't be processed due to a server error: %s", detailMessage);
throw new RPCExecutionException(exceptionMessage);
}
default -> {
var exceptionMessage = String.format("Server responded with unknown status code: %d", status);
throw new RPCExecutionException(exceptionMessage);
}
};
} finally {
// specifically release the buffer here to prevent memory leaks, especially if we didn't consume
// the whole buffer content (for example due to an exception during handling)
responseData.forceRelease();
}
}
}
9 changes: 9 additions & 0 deletions launcher/java22/src/main/resources/launcher.cnl
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ var cloudnet.node.memory 256
# These transports are platform optimized, generally generate less garbage and improve performance over the non-native
# (but fallback when native is disabled) nio transport
#var cloudnet.no-native false

#
# Extra properties. Only touch if you know what you are doing!
#

# Greatly reduces the maximum size (default=infinite) of buffers cached by the jvm. These cached buffers can cause
# OutOfMemoryErrors, and with many netty threads existing, each thread with its own cache, capable of storing hundreds
# to thousands of buffers each, this adds up.
#var jdk.nio.maxCachedBufferSize 512
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import eu.cloudnetservice.driver.impl.network.NetworkConstants;
import eu.cloudnetservice.driver.network.buffer.DataBuf;
import eu.cloudnetservice.driver.network.buffer.DataBufFactory;
import eu.cloudnetservice.driver.network.chunk.TransferStatus;
import eu.cloudnetservice.driver.network.chunk.event.FileQueryRequestEvent;
import jakarta.inject.Inject;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A listener for channel messages that request a query transfer of a file.
Expand All @@ -33,6 +36,7 @@
*/
public final class FileQueryChannelMessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(FileQueryChannelMessageListener.class);
private final EventManager eventManager;

/**
Expand Down Expand Up @@ -93,7 +97,14 @@ public void handleFileQueryRequest(@NonNull ChannelMessageReceiveEvent event) {
.transferChannel("query:dummy")
.sessionUniqueId(chunkedSessionId)
.build()
.transferChunkedData();
.transferChunkedData()
.whenComplete((transferStatus, throwable) -> {
if (throwable != null) {
LOGGER.error("Failed to transfer data", throwable);
} else if (transferStatus != TransferStatus.SUCCESS) {
LOGGER.error("Transfer finished on TransferStatus {}", transferStatus);
}
});
var responseData = constructRequestResponse(true);
event.binaryResponse(responseData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,16 @@ public void sendChannelMessage(@NonNull ChannelMessage message, boolean allowClu

channel.sendQueryAsync(new ChannelMessagePacket(message, false)).whenComplete((packet, th) -> {
// check if we got an actual result from the request
if (th == null && packet.readable()) {
// add all resulting messages we got
result.addAll(packet.content().readObject(COL_MSG));
try {
if (th == null && packet.readable()) {
// add all resulting messages we got
result.addAll(packet.content().readObject(COL_MSG));
}
} finally {
if (packet != null) {
// make sure to release the packet buffer
packet.content().forceRelease();
}
}

// count down - one channel responded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@ public void sendChannelMessage(@NonNull ChannelMessage channelMessage) {
) {
return this.networkClient.firstChannel().sendQueryAsync(new ChannelMessagePacket(message, true))
.thenApply(Packet::content)
.thenApply(data -> Objects.requireNonNullElse(data.readObject(MESSAGES), List.of()));
.thenApply(data -> {
try {
return Objects.requireNonNullElse(data.readObject(MESSAGES), List.of());
} finally {
// make sure the packet content is released
data.forceRelease();
}
});
}

@Override
Expand Down