diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkChannel.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkChannel.java index 8561676ab..b9492cee7 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkChannel.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkChannel.java @@ -23,9 +23,13 @@ import eu.cloudnetservice.driver.network.protocol.Packet; import eu.cloudnetservice.driver.network.protocol.PacketListenerRegistry; import io.netty5.channel.Channel; +import io.netty5.util.concurrent.Future; import io.netty5.util.concurrent.Promise; import io.netty5.util.concurrent.PromiseCombiner; +import java.nio.channels.ClosedChannelException; import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The default netty based implementation of a network channel. @@ -34,6 +38,7 @@ */ public final class NettyNetworkChannel extends DefaultNetworkChannel implements NetworkChannel { + private static final Logger LOGGER = LoggerFactory.getLogger(NettyNetworkChannel.class); private final Channel channel; /** @@ -82,13 +87,27 @@ public void sendPacket(@NonNull Packet... packets) { executor.execute(() -> this.sendPacket(packets)); } } + + private Future writeAndFlush(@NonNull Packet packet) { + var future = this.channel.writeAndFlush(packet); + future.addListener(f -> { + if (!f.isSuccess()) { + var cause = f.cause(); + // ignore ClosedChannelException, when node stops this sometimes happens + if (!(cause instanceof ClosedChannelException)) { + LOGGER.error("Failed to send packet", cause); + } + } + }); + return future; + } /** * {@inheritDoc} */ @Override public void sendPacket(@NonNull Packet packet) { - this.channel.writeAndFlush(packet); + this.writeAndFlush(packet); } /** @@ -96,7 +115,7 @@ public void sendPacket(@NonNull Packet packet) { */ @Override public void sendPacketSync(@NonNull Packet packet) { - var future = this.channel.writeAndFlush(packet); + var future = this.writeAndFlush(packet); if (!future.executor().inEventLoop()) { // only await the future if we're not currently in the event loop // as this would deadlock the write operations triggered previously diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java index cda71edd4..8d8a5535d 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java @@ -97,6 +97,10 @@ protected void doHandlePacket(@NonNull BasePacket packet) { if (task != null) { // complete the waiting task task.complete(packet); + + // we can't add a packet content released check here, because the + // query handler may not be registered to the future. + // Basically, the query response can come before the handler has been registered // don't post a query response packet to another handler at all // the packet might be inbound - we might be expected to respond @@ -107,6 +111,9 @@ protected void doHandlePacket(@NonNull BasePacket packet) { // check if any handler can handle the incoming packet if (this.channel.handler().handlePacketReceive(this.channel, packet) && this.channel.packetRegistry().handlePacket(this.channel, packet)) { + if (packet.content().accessible()) { + LOGGER.error("Listeners did not release packet content: {}", packet); + } return; } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java index 105f03375..56700b83f 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java @@ -25,6 +25,8 @@ import io.netty5.buffer.BufferAllocator; import io.netty5.buffer.BufferUtil; import io.netty5.buffer.DefaultBufferAllocators; +import io.netty5.buffer.MemoryManager; +import io.netty5.buffer.bytebuffer.ByteBufferMemoryManager; import io.netty5.channel.Channel; import io.netty5.channel.ChannelFactory; import io.netty5.channel.EventLoopGroup; @@ -80,7 +82,8 @@ public final class NettyUtil { if ("netty-default".equals(preferredBufferAllocator) || NettyNioBufferReleasingAllocator.notAbleToFreeBuffers()) { SELECTED_BUFFER_ALLOCATOR = DefaultBufferAllocators.offHeapAllocator(); } else { - SELECTED_BUFFER_ALLOCATOR = new NettyNioBufferReleasingAllocator(); + System.setProperty("io.netty5.buffer.MemoryManager", ByteBufferMemoryManager.class.getName()); + SELECTED_BUFFER_ALLOCATOR = new NettyNioBufferReleasingAllocator(((ByteBufferMemoryManager) MemoryManager.instance())); } // select the transport type to use for netty diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java index c3167eb27..e355729d2 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java @@ -337,4 +337,9 @@ public void close() { return result; } + + @Override + public String toString() { + return "%s[readableBytes=%d]".formatted(this.getClass().getSimpleName(), this.buffer.readableBytes()); + } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyNioBufferReleasingAllocator.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyNioBufferReleasingAllocator.java index 73523e03a..17c13fdad 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyNioBufferReleasingAllocator.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyNioBufferReleasingAllocator.java @@ -48,12 +48,12 @@ public final class NettyNioBufferReleasingAllocator implements BufferAllocator, /** * Constructs a new buffer allocator instance. All instances are backed by a byte buffer memory manager. */ - public NettyNioBufferReleasingAllocator() { - this.manager = new ByteBufferMemoryManager(); + public NettyNioBufferReleasingAllocator(@NonNull ByteBufferMemoryManager manager) { + this.manager = manager; } /** - * Get if this allocator can free direct buffers. If this method returns false, this allocator shouldn't be used as it + * Get if this allocator can free direct buffers. If this method returns true, this allocator shouldn't be used as it * does nothing. Use the default netty allocator instead. * * @return if this allocator is able to free direct nio buffers. @@ -170,7 +170,7 @@ public void drop(@NonNull Buffer obj) { try { DIRECT_BUFFER_CLEANER.invokeExact(recoverableMemory); } catch (Throwable exception) { - LOGGER.debug("Unable to free direct ByteBuf using Unsafe.invokeCleaner: {}", exception.getMessage()); + LOGGER.error("Unable to free direct ByteBuf using Unsafe.invokeCleaner: {}", exception.getMessage()); } } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FrameDecoder.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FrameDecoder.java index ded596ca4..1b67c85b1 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FrameDecoder.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FrameDecoder.java @@ -21,9 +21,13 @@ import io.netty5.channel.ChannelHandlerContext; import io.netty5.handler.codec.ByteToMessageDecoder; import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class VarInt32FrameDecoder extends ByteToMessageDecoder { + private static final Logger LOGGER = LoggerFactory.getLogger(VarInt32FrameDecoder.class); + /** * {@inheritDoc} */ @@ -42,12 +46,16 @@ protected void decode(@NonNull ChannelHandlerContext ctx, @NonNull Buffer in) { return; } - // skip empty packets silently - if (length <= 0) { - // check if there are bytes to skip - if (in.readableBytes() > 0) { - in.skipReadableBytes(in.readableBytes()); - } + if (length == 0) { + // empty packet length should not be possible. Someone didn't follow the protocol. (Should we maybe even disconnect?) + LOGGER.error("Skipped incoming packet with length 0"); + return; + } else if (length < 0) { + // negative packet length is not a good omen... Someone didn't follow the protocol. (Should we maybe even disconnect?) + LOGGER.error("Incoming packet had negative length {} - readableBytes: {}", length, in.readableBytes()); + // try to only skip 1 byte. That way we should at some point arrive at a valid state again + // should never happen to begin with + in.readerOffset(readerIndex + 1); return; } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FramePrepender.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FramePrepender.java index dc8cec8e0..156f95d74 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FramePrepender.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/VarInt32FramePrepender.java @@ -24,10 +24,13 @@ import io.netty5.util.concurrent.Promise; import io.netty5.util.concurrent.PromiseCombiner; import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class VarInt32FramePrepender extends ChannelHandlerAdapter { public static final VarInt32FramePrepender INSTANCE = new VarInt32FramePrepender(); + private static final Logger LOGGER = LoggerFactory.getLogger(VarInt32FramePrepender.class); /** * {@inheritDoc} @@ -37,6 +40,13 @@ public final class VarInt32FramePrepender extends ChannelHandlerAdapter { if (msg instanceof Buffer packetDataBuffer) { // first write the buffer that contains the length of the following buffer var length = packetDataBuffer.readableBytes(); + if (length == 0) { + // empty packet should not happen. This indicates a bug in the NettyPacketEncoder, + // which should rather not submit a Buffer than submit an empty one. + // Let's also log a stack trace, may make things easier to debug if they break. + LOGGER.error("Skip packet with length 0", new Exception("Thread dump")); + return ctx.newFailedFuture(new IllegalArgumentException("Send buffer with readableBytes=0")); + } var encodedLengthFieldLength = NettyUtil.varIntBytes(length); var lengthBuffer = ctx.bufferAllocator().allocate(encodedLengthFieldLength); NettyUtil.writeVarInt(lengthBuffer, length); diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java index 4d8d5185f..5e35d22e7 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java @@ -44,27 +44,33 @@ record RPCResultMapper( @Override public @UnknownNullability T apply(@UnknownNullability Packet response) { var responseData = response.content(); - var status = responseData.readByte(); - return switch (status) { - case RPCInvocationResult.STATUS_OK -> this.objectMapper.readObject(responseData, this.expectedResultType); - case RPCInvocationResult.STATUS_ERROR -> { - RPCExceptionUtil.rethrowHandlingException(responseData); - yield null; // never reached, but must be there for the compiler to be happy - } - case RPCInvocationResult.STATUS_BAD_REQUEST -> { - var detailMessage = responseData.readString(); - var exceptionMessage = String.format("RPC couldn't be processed due to bad input data: %s", detailMessage); - throw new RPCExecutionException(exceptionMessage); - } - case RPCInvocationResult.STATUS_SERVER_ERROR -> { - var detailMessage = responseData.readString(); - var exceptionMessage = String.format("RPC couldn't be processed due to a server error: %s", detailMessage); - throw new RPCExecutionException(exceptionMessage); - } - default -> { - var exceptionMessage = String.format("Server responded with unknown status code: %d", status); - throw new RPCExecutionException(exceptionMessage); - } - }; + try { + var status = responseData.readByte(); + return switch (status) { + case RPCInvocationResult.STATUS_OK -> this.objectMapper.readObject(responseData, this.expectedResultType); + case RPCInvocationResult.STATUS_ERROR -> { + RPCExceptionUtil.rethrowHandlingException(responseData); + yield null; // never reached, but must be there for the compiler to be happy + } + case RPCInvocationResult.STATUS_BAD_REQUEST -> { + var detailMessage = responseData.readString(); + var exceptionMessage = String.format("RPC couldn't be processed due to bad input data: %s", detailMessage); + throw new RPCExecutionException(exceptionMessage); + } + case RPCInvocationResult.STATUS_SERVER_ERROR -> { + var detailMessage = responseData.readString(); + var exceptionMessage = String.format("RPC couldn't be processed due to a server error: %s", detailMessage); + throw new RPCExecutionException(exceptionMessage); + } + default -> { + var exceptionMessage = String.format("Server responded with unknown status code: %d", status); + throw new RPCExecutionException(exceptionMessage); + } + }; + } finally { + // specifically release the buffer here to prevent memory leaks, especially if we didn't consume + // the whole buffer content (for example due to an exception during handling) + responseData.forceRelease(); + } } } diff --git a/launcher/java22/src/main/resources/launcher.cnl b/launcher/java22/src/main/resources/launcher.cnl index c9a54df61..3b86f6e15 100644 --- a/launcher/java22/src/main/resources/launcher.cnl +++ b/launcher/java22/src/main/resources/launcher.cnl @@ -95,3 +95,12 @@ var cloudnet.node.memory 256 # These transports are platform optimized, generally generate less garbage and improve performance over the non-native # (but fallback when native is disabled) nio transport #var cloudnet.no-native false + +# +# Extra properties. Only touch if you know what you are doing! +# + +# Greatly reduces the maximum size (default=infinite) of buffers cached by the jvm. These cached buffers can cause +# OutOfMemoryErrors, and with many netty threads existing, each thread with its own cache, capable of storing hundreds +# to thousands of buffers each, this adds up. +#var jdk.nio.maxCachedBufferSize 512 diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/chunk/FileQueryChannelMessageListener.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/chunk/FileQueryChannelMessageListener.java index a6cc3c6cc..65f574dd7 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/chunk/FileQueryChannelMessageListener.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/chunk/FileQueryChannelMessageListener.java @@ -22,9 +22,12 @@ import eu.cloudnetservice.driver.impl.network.NetworkConstants; import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.network.buffer.DataBufFactory; +import eu.cloudnetservice.driver.network.chunk.TransferStatus; import eu.cloudnetservice.driver.network.chunk.event.FileQueryRequestEvent; import jakarta.inject.Inject; import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A listener for channel messages that request a query transfer of a file. @@ -33,6 +36,7 @@ */ public final class FileQueryChannelMessageListener { + private static final Logger LOGGER = LoggerFactory.getLogger(FileQueryChannelMessageListener.class); private final EventManager eventManager; /** @@ -93,7 +97,14 @@ public void handleFileQueryRequest(@NonNull ChannelMessageReceiveEvent event) { .transferChannel("query:dummy") .sessionUniqueId(chunkedSessionId) .build() - .transferChunkedData(); + .transferChunkedData() + .whenComplete((transferStatus, throwable) -> { + if (throwable != null) { + LOGGER.error("Failed to transfer data", throwable); + } else if (transferStatus != TransferStatus.SUCCESS) { + LOGGER.error("Transfer finished on TransferStatus {}", transferStatus); + } + }); var responseData = constructRequestResponse(true); event.binaryResponse(responseData); } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java index 2c1f37be5..017515f74 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java @@ -143,9 +143,16 @@ public void sendChannelMessage(@NonNull ChannelMessage message, boolean allowClu channel.sendQueryAsync(new ChannelMessagePacket(message, false)).whenComplete((packet, th) -> { // check if we got an actual result from the request - if (th == null && packet.readable()) { - // add all resulting messages we got - result.addAll(packet.content().readObject(COL_MSG)); + try { + if (th == null && packet.readable()) { + // add all resulting messages we got + result.addAll(packet.content().readObject(COL_MSG)); + } + } finally { + if (packet != null) { + // make sure to release the packet buffer + packet.content().forceRelease(); + } } // count down - one channel responded diff --git a/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java b/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java index f115b4b37..602d0b73d 100644 --- a/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java +++ b/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java @@ -78,7 +78,14 @@ public void sendChannelMessage(@NonNull ChannelMessage channelMessage) { ) { return this.networkClient.firstChannel().sendQueryAsync(new ChannelMessagePacket(message, true)) .thenApply(Packet::content) - .thenApply(data -> Objects.requireNonNullElse(data.readObject(MESSAGES), List.of())); + .thenApply(data -> { + try { + return Objects.requireNonNullElse(data.readObject(MESSAGES), List.of()); + } finally { + // make sure the packet content is released + data.forceRelease(); + } + }); } @Override