Skip to content

Commit

Permalink
non blocking output reader
Browse files Browse the repository at this point in the history
  • Loading branch information
DasBabyPixel committed Jan 20, 2025
1 parent ba5acda commit 547d34a
Showing 1 changed file with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@
package eu.cloudnetservice.node.impl.service.defaults.log;

import com.google.common.base.Preconditions;
import eu.cloudnetservice.common.util.StringUtil;
import eu.cloudnetservice.driver.service.ServiceId;
import eu.cloudnetservice.node.config.Configuration;
import eu.cloudnetservice.utils.base.StringUtil;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

public class ProcessServiceLogCache extends AbstractServiceLogCache {

private final ProcessServiceLogReadScheduler scheduler;

private volatile ProcessHandle targetProcess;
private BufferedReader outStreamReader;
private BufferedReader errStreamReader;
private NonBlockingReader outStreamReader;
private NonBlockingReader errStreamReader;

public ProcessServiceLogCache(
@NonNull Configuration configuration,
Expand All @@ -45,8 +47,8 @@ public ProcessServiceLogCache(
public void start(@NonNull Process process) {
Preconditions.checkState(this.targetProcess == null);
this.targetProcess = process.toHandle();
this.outStreamReader = process.inputReader(StandardCharsets.UTF_8);
this.errStreamReader = process.errorReader(StandardCharsets.UTF_8);
this.outStreamReader = new NonBlockingReader(process.inputReader(StandardCharsets.UTF_8));
this.errStreamReader = new NonBlockingReader(process.errorReader(StandardCharsets.UTF_8));
this.scheduler.schedule(this);
}

Expand All @@ -55,6 +57,12 @@ public void stop() {
var outReader = this.outStreamReader;
var errReader = this.errStreamReader;
if (outReader != null && errReader != null) {
outReader.shutdown = true;
errReader.shutdown = true;
// Processes killed by the termination timeout could have remaining content in the output streams.
// Read all remaining content and then close the streams
this.readLinesFromStream(outReader, false);
this.readLinesFromStream(errReader, true);
outReader.close();
errReader.close();
this.outStreamReader = null;
Expand Down Expand Up @@ -112,8 +120,8 @@ public boolean readProcessOutputContent() {
}
}

private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStream) throws IOException {
while (stream.ready()) {
private void readLinesFromStream(@NonNull NonBlockingReader stream, boolean errStream) throws IOException {
while (true) {
var line = stream.readLine();
if (line == null) {
break;
Expand All @@ -122,4 +130,56 @@ private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStre
this.handleItem(line, errStream);
}
}

private static class NonBlockingReader {

private final BufferedReader reader;
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private boolean shutdown = false;
private boolean lastCR = false;

public NonBlockingReader(BufferedReader reader) {
this.reader = reader;
}

/**
* Tries to read a line, if no line is available returns null
*/
private synchronized @Nullable String readLine() throws IOException {
// Strategy: Collect everything into the buffer until we hit a newline
// Then return the line excluding the (CR and) BR character(s)
while (this.reader.ready()) {
var read = this.reader.read();
if (this.lastCR) {
// Make sure to skip CR when creating the line
if (read != '\n') {
this.buffer.write('\r');
}
this.lastCR = false;
} else if (read == '\r') {
this.lastCR = true;
// Don't append \r to buffer
continue;
}

if (read == '\n') {
// finished newline
var line = this.buffer.toString(StandardCharsets.UTF_8);
this.buffer.reset();
return line;
}
this.buffer.write(read);
}
if (this.shutdown && this.buffer.size() > 0) {
var line = this.buffer.toString(StandardCharsets.UTF_8);
this.buffer.reset();
return line;
}
return null;
}

private void close() throws IOException {
this.reader.close();
}
}
}

0 comments on commit 547d34a

Please sign in to comment.