From 8afcbef6c9259299addd6a7efa4c3c35e250f6f1 Mon Sep 17 00:00:00 2001 From: Lorenz Wrobel <43410952+DasBabyPixel@users.noreply.github.com> Date: Thu, 3 Oct 2024 01:11:26 +0200 Subject: [PATCH] non blocking output reader --- .../defaults/log/ProcessServiceLogCache.java | 74 +++++++++++++++++-- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java index ec1cc1b2e..849ee52d6 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java @@ -17,21 +17,23 @@ package eu.cloudnetservice.node.impl.service.defaults.log; import com.google.common.base.Preconditions; -import eu.cloudnetservice.common.util.StringUtil; import eu.cloudnetservice.driver.service.ServiceId; import eu.cloudnetservice.node.config.Configuration; +import eu.cloudnetservice.utils.base.StringUtil; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import lombok.NonNull; +import org.jetbrains.annotations.Nullable; public class ProcessServiceLogCache extends AbstractServiceLogCache { private final ProcessServiceLogReadScheduler scheduler; private volatile ProcessHandle targetProcess; - private BufferedReader outStreamReader; - private BufferedReader errStreamReader; + private NonBlockingReader outStreamReader; + private NonBlockingReader errStreamReader; public ProcessServiceLogCache( @NonNull Configuration configuration, @@ -45,8 +47,8 @@ public ProcessServiceLogCache( public void start(@NonNull Process process) { Preconditions.checkState(this.targetProcess == null); this.targetProcess = process.toHandle(); - this.outStreamReader = process.inputReader(StandardCharsets.UTF_8); - this.errStreamReader = process.errorReader(StandardCharsets.UTF_8); + this.outStreamReader = new NonBlockingReader(process.inputReader(StandardCharsets.UTF_8)); + this.errStreamReader = new NonBlockingReader(process.errorReader(StandardCharsets.UTF_8)); this.scheduler.schedule(this); } @@ -55,6 +57,12 @@ public void stop() { var outReader = this.outStreamReader; var errReader = this.errStreamReader; if (outReader != null && errReader != null) { + outReader.shutdown = true; + errReader.shutdown = true; + // Processes killed by the termination timeout could have remaining content in the output streams. + // Read all remaining content and then close the streams + this.readLinesFromStream(outReader, false); + this.readLinesFromStream(errReader, true); outReader.close(); errReader.close(); this.outStreamReader = null; @@ -112,8 +120,8 @@ public boolean readProcessOutputContent() { } } - private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStream) throws IOException { - while (stream.ready()) { + private void readLinesFromStream(@NonNull NonBlockingReader stream, boolean errStream) throws IOException { + while (true) { var line = stream.readLine(); if (line == null) { break; @@ -122,4 +130,56 @@ private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStre this.handleItem(line, errStream); } } + + private static class NonBlockingReader { + + private final BufferedReader reader; + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + private boolean shutdown = false; + private boolean lastCR = false; + + public NonBlockingReader(BufferedReader reader) { + this.reader = reader; + } + + /** + * Tries to read a line, if no line is available returns null + */ + private synchronized @Nullable String readLine() throws IOException { + // Strategy: Collect everything into the buffer until we hit a newline + // Then return the line excluding the (CR and) BR character(s) + while (this.reader.ready()) { + var read = this.reader.read(); + if (this.lastCR) { + // Make sure to skip CR when creating the line + if (read != '\n') { + this.buffer.write('\r'); + } + this.lastCR = false; + } else if (read == '\r') { + this.lastCR = true; + // Don't append \r to buffer + continue; + } + + if (read == '\n') { + // finished newline + var line = this.buffer.toString(StandardCharsets.UTF_8); + this.buffer.reset(); + return line; + } + this.buffer.write(read); + } + if (this.shutdown && this.buffer.size() > 0) { + var line = this.buffer.toString(StandardCharsets.UTF_8); + this.buffer.reset(); + return line; + } + return null; + } + + private void close() throws IOException { + this.reader.close(); + } + } }