From ba5acdadcae5b2856c58f9b07d0ead2bc760c1fa Mon Sep 17 00:00:00 2001 From: Lorenz Wrobel <43410952+DasBabyPixel@users.noreply.github.com> Date: Mon, 20 Jan 2025 16:17:23 +0100 Subject: [PATCH 1/3] reapply all reverted changes from https://github.com/CloudNetService/CloudNet/commit/f150948effb632cb2172230e2dff9b8582cffaff --- .../docker/impl/DockerizedService.java | 11 +- .../impl/DockerizedServiceLogCache.java | 16 +-- .../node/service/ServiceConsoleLogCache.java | 12 +- .../service/defaults/AbstractService.java | 34 ++++- .../defaults/DefaultCloudServiceManager.java | 18 +-- .../impl/service/defaults/JVMService.java | 99 ++++++++------ .../factory/JVMLocalCloudServiceFactory.java | 9 +- .../defaults/log/AbstractServiceLogCache.java | 37 +++-- .../defaults/log/ProcessServiceLogCache.java | 126 ++++++++++++------ .../log/ProcessServiceLogReadScheduler.java | 91 +++++++++++++ 10 files changed, 312 insertions(+), 141 deletions(-) create mode 100644 node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogReadScheduler.java diff --git a/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java b/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java index 3775b3de9d..7bf28eea9a 100644 --- a/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java +++ b/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java @@ -89,7 +89,6 @@ public class DockerizedService extends JVMService { protected final DockerClient dockerClient; protected final DockerConfiguration configuration; - protected final DockerizedServiceLogCache logCache; protected volatile String containerId; @@ -109,6 +108,7 @@ protected DockerizedService( @NonNull DockerClient dockerClient, @NonNull DockerConfiguration dockerConfiguration ) { + var logCache = new DockerizedServiceLogCache(nodeConfig, configuration.serviceId()); super( i18n, tickLoop, @@ -116,14 +116,11 @@ protected DockerizedService( configuration, manager, eventManager, - versionProvider, + logCache, versionProvider, serviceConfigurationPreparer); this.dockerClient = dockerClient; this.configuration = dockerConfiguration; - - super.logCache = this.logCache = new DockerizedServiceLogCache(nodeConfig, this); - this.initLogHandler(); } @Override @@ -379,8 +376,8 @@ protected boolean needsImagePull(@NonNull DockerImage image) { public final class ServiceLogCacheAdapter extends ResultCallback.Adapter { @Override - public void onNext(Frame object) { - DockerizedService.this.logCache.handle(object); + public void onNext(@NonNull Frame object) { + ((DockerizedServiceLogCache) DockerizedService.this.logCache).handle(object); } } } diff --git a/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedServiceLogCache.java b/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedServiceLogCache.java index dfc19ca0a2..c0487751da 100644 --- a/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedServiceLogCache.java +++ b/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedServiceLogCache.java @@ -17,22 +17,16 @@ package eu.cloudnetservice.modules.docker.impl; import com.github.dockerjava.api.model.Frame; +import eu.cloudnetservice.driver.service.ServiceId; import eu.cloudnetservice.node.config.Configuration; import eu.cloudnetservice.node.impl.service.defaults.log.AbstractServiceLogCache; -import eu.cloudnetservice.node.service.CloudService; -import eu.cloudnetservice.node.service.ServiceConsoleLogCache; import java.nio.charset.StandardCharsets; import lombok.NonNull; public class DockerizedServiceLogCache extends AbstractServiceLogCache { - public DockerizedServiceLogCache(@NonNull Configuration configuration, @NonNull CloudService service) { - super(configuration, service); - } - - @Override - public @NonNull ServiceConsoleLogCache update() { - return this; + public DockerizedServiceLogCache(@NonNull Configuration configuration, @NonNull ServiceId associatedServiceId) { + super(configuration, associatedServiceId); } public void handle(@NonNull Frame frame) { @@ -49,9 +43,7 @@ protected void handleItem(@NonNull String content, boolean comesFromErrorStream) if (content.contains("\n") || content.contains("\r")) { for (var input : content.split("\r")) { for (var text : input.split("\n")) { - if (!text.trim().isEmpty()) { - super.handleItem(text, comesFromErrorStream); - } + super.handleItem(text, comesFromErrorStream); } } } diff --git a/node/api/src/main/java/eu/cloudnetservice/node/service/ServiceConsoleLogCache.java b/node/api/src/main/java/eu/cloudnetservice/node/service/ServiceConsoleLogCache.java index 5938af186c..86a32cb580 100644 --- a/node/api/src/main/java/eu/cloudnetservice/node/service/ServiceConsoleLogCache.java +++ b/node/api/src/main/java/eu/cloudnetservice/node/service/ServiceConsoleLogCache.java @@ -16,6 +16,7 @@ package eu.cloudnetservice.node.service; +import eu.cloudnetservice.driver.service.ServiceId; import java.util.Collection; import java.util.Queue; import lombok.NonNull; @@ -23,11 +24,11 @@ public interface ServiceConsoleLogCache { - @NonNull Queue cachedLogMessages(); - - @NonNull ServiceConsoleLogCache update(); + @NonNull + Queue cachedLogMessages(); - @NonNull CloudService service(); + @NonNull + ServiceId associatedServiceId(); int logCacheSize(); @@ -42,5 +43,6 @@ public interface ServiceConsoleLogCache { void removeHandler(@NonNull ServiceConsoleLineHandler handler); @NonNull - @UnmodifiableView Collection handlers(); + @UnmodifiableView + Collection handlers(); } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java index 3cd35e5c20..fb284febe9 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java @@ -24,6 +24,7 @@ import eu.cloudnetservice.driver.channel.ChannelMessageTarget; import eu.cloudnetservice.driver.document.Document; import eu.cloudnetservice.driver.event.EventManager; +import eu.cloudnetservice.driver.event.events.service.CloudServiceLogEntryEvent; import eu.cloudnetservice.driver.impl.network.NetworkConstants; import eu.cloudnetservice.driver.language.I18n; import eu.cloudnetservice.driver.network.HostAndPort; @@ -105,6 +106,7 @@ public abstract class AbstractService implements InternalCloudService { protected final DefaultTickLoop mainThread; protected final EventManager eventManager; protected final Configuration configuration; + protected final ServiceConsoleLogCache logCache; protected final InternalCloudServiceManager cloudServiceManager; protected final ServiceConfiguration serviceConfiguration; protected final ServiceVersionProvider serviceVersionProvider; @@ -121,8 +123,6 @@ public abstract class AbstractService implements InternalCloudService { protected final Collection installedInclusions = ConcurrentHashMap.newKeySet(); protected final Collection installedDeployments = ConcurrentHashMap.newKeySet(); - protected ServiceConsoleLogCache logCache; - protected volatile NetworkChannel networkChannel; protected volatile long connectionTimestamp = -1; @@ -136,10 +136,12 @@ protected AbstractService( @NonNull ServiceConfiguration configuration, @NonNull InternalCloudServiceManager manager, @NonNull EventManager eventManager, + @NonNull ServiceConsoleLogCache logCache, @NonNull ServiceVersionProvider versionProvider, @NonNull ServiceConfigurationPreparer serviceConfigurationPreparer ) { this.i18n = i18n; + this.logCache = logCache; this.mainThread = tickLoop; this.configuration = nodeConfig; this.eventManager = eventManager; @@ -162,6 +164,7 @@ protected AbstractService( ServiceLifeCycle.PREPARED, configuration.propertyHolder().immutableCopy()); this.pushServiceInfoSnapshotUpdate(ServiceLifeCycle.PREPARED, false); + this.initStandardServiceLogHandler(); // register the service locally for now manager.registerUnacceptedService(this); @@ -884,6 +887,33 @@ protected void downloadInclusionFile(@NonNull ServiceRemoteInclusion inclusion, this.serviceId().nodeUniqueId()}; } + protected void initStandardServiceLogHandler() { + this.logCache.addHandler((_, line, stderr) -> { + for (var logTarget : this.logTargets) { + if (logTarget._1().equals(ChannelMessageSender.self().toTarget())) { + // the current target is the node this service is running on, print it directly here + this.eventManager.callEvent(logTarget._2(), new CloudServiceLogEntryEvent( + this.currentServiceInfo, + line, + stderr ? CloudServiceLogEntryEvent.StreamType.STDERR : CloudServiceLogEntryEvent.StreamType.STDOUT)); + } else { + // the listener is listening remotely, send the line to the network component + ChannelMessage.builder() + .target(logTarget._1()) + .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) + .message("screen_new_line") + .buffer(DataBuf.empty() + .writeObject(this.currentServiceInfo) + .writeString(logTarget._2()) + .writeString(line) + .writeBoolean(stderr)) + .build() + .send(); + } + } + }); + } + protected abstract void startProcess(); protected abstract void stopProcess(); diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/DefaultCloudServiceManager.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/DefaultCloudServiceManager.java index 678ac85c1f..b21dff0f83 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/DefaultCloudServiceManager.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/DefaultCloudServiceManager.java @@ -171,20 +171,14 @@ public DefaultCloudServiceManager( }) .currentGetter(group -> this.serviceProviderByName(group.name()).serviceInfo()) .build()); - // schedule the updating of the local service log cache + + // schedule the service watchdog to run once per second mainThread.scheduleTask(() -> { for (var service : this.localCloudServices()) { - // we only need to look at running services - if (service.lifeCycle() == ServiceLifeCycle.RUNNING) { - // detect dead services and stop them - if (service.alive()) { - service.serviceConsoleLogCache().update(); - LOGGER.trace("Updated service log cache of {}", service.serviceId().name()); - } else { - eventManager.callEvent(new CloudServicePreForceStopEvent(service)); - service.stop(); - LOGGER.trace("Stopped dead service {}", service.serviceId().name()); - } + if (service.lifeCycle() == ServiceLifeCycle.RUNNING && !service.alive()) { + eventManager.callEvent(new CloudServicePreForceStopEvent(service)); + service.stop(); + LOGGER.debug("Stopped dead service {}", service.serviceId().name()); } } return null; diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/JVMService.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/JVMService.java index e91c9b2cb7..bff07ab65f 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/JVMService.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/JVMService.java @@ -18,13 +18,8 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; -import eu.cloudnetservice.driver.channel.ChannelMessage; -import eu.cloudnetservice.driver.channel.ChannelMessageSender; import eu.cloudnetservice.driver.event.EventManager; -import eu.cloudnetservice.driver.event.events.service.CloudServiceLogEntryEvent; -import eu.cloudnetservice.driver.impl.network.NetworkConstants; import eu.cloudnetservice.driver.language.I18n; -import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.service.ServiceConfiguration; import eu.cloudnetservice.driver.service.ServiceEnvironment; import eu.cloudnetservice.driver.service.ServiceEnvironmentType; @@ -33,9 +28,11 @@ import eu.cloudnetservice.node.event.service.CloudServicePreProcessStartEvent; import eu.cloudnetservice.node.impl.service.InternalCloudServiceManager; import eu.cloudnetservice.node.impl.service.defaults.log.ProcessServiceLogCache; +import eu.cloudnetservice.node.impl.service.defaults.log.ProcessServiceLogReadScheduler; import eu.cloudnetservice.node.impl.tick.DefaultTickLoop; import eu.cloudnetservice.node.impl.version.ServiceVersionProvider; import eu.cloudnetservice.node.service.ServiceConfigurationPreparer; +import eu.cloudnetservice.node.service.ServiceConsoleLogCache; import eu.cloudnetservice.utils.base.StringUtil; import eu.cloudnetservice.utils.base.io.FileUtil; import io.vavr.CheckedFunction1; @@ -85,6 +82,31 @@ public JVMService( @NonNull InternalCloudServiceManager manager, @NonNull EventManager eventManager, @NonNull ServiceVersionProvider versionProvider, + @NonNull ServiceConfigurationPreparer serviceConfigurationPreparer, + @NonNull ProcessServiceLogReadScheduler processLogReadScheduler + ) { + var logCache = new ProcessServiceLogCache(nodeConfig, configuration.serviceId(), processLogReadScheduler); + this( + i18n, + tickLoop, + nodeConfig, + configuration, + manager, + eventManager, + logCache, + versionProvider, + serviceConfigurationPreparer); + } + + protected JVMService( + @NonNull I18n i18n, + @NonNull DefaultTickLoop tickLoop, + @NonNull Configuration nodeConfig, + @NonNull ServiceConfiguration configuration, + @NonNull InternalCloudServiceManager manager, + @NonNull EventManager eventManager, + @NonNull ServiceConsoleLogCache logCache, + @NonNull ServiceVersionProvider versionProvider, @NonNull ServiceConfigurationPreparer serviceConfigurationPreparer ) { super( @@ -94,10 +116,9 @@ public JVMService( configuration, manager, eventManager, + logCache, versionProvider, serviceConfigurationPreparer); - super.logCache = new ProcessServiceLogCache(() -> this.process, nodeConfig, this); - this.initLogHandler(); } @Override @@ -178,23 +199,33 @@ protected void startProcess() { @Override protected void stopProcess() { - if (this.process != null) { - // try to send a shutdown command + var process = this.process; + if (process != null) { + // try to send a shutdown command (still needs the process instance to be present) this.runCommand("end"); this.runCommand("stop"); + this.process = null; + // try to wait for the process to terminate normally, setting the + // terminated flag to true if the process exited in the given time frame + var terminated = false; try { - // wait until the process termination seconds exceeded - if (this.process.waitFor(this.configuration.processTerminationTimeoutSeconds(), TimeUnit.SECONDS)) { - this.process.exitValue(); // validation that the process terminated - this.process = null; // reset as there is no fall-through - return; + if (process.waitFor(this.configuration.processTerminationTimeoutSeconds(), TimeUnit.SECONDS)) { + process.exitValue(); // validation that the process terminated + terminated = true; } } catch (IllegalThreadStateException | InterruptedException ignored) { // force shutdown the process } - // force destroy the process now - not much we can do here more than that - this.process.toHandle().destroyForcibly(); - this.process = null; + + // force-destroy the process in case it didn't terminate normally + if (!terminated) { + process.toHandle().destroyForcibly(); + } + + // stop the reading process when the process exited + if (this.logCache instanceof ProcessServiceLogCache processServiceLogCache) { + processServiceLogCache.stop(); + } } } @@ -240,6 +271,13 @@ protected void doStartProcess( // start the process and fire the post start event this.process = builder.start(); this.eventManager.callEvent(new CloudServicePostProcessStartEvent(this)); + + // start the log reading unless some user code changed the log cache type + // in that case it's up to the user to start the reading process + if (super.logCache instanceof ProcessServiceLogCache processServiceLogCache) { + processServiceLogCache.start(this.process); + LOGGER.debug("Started {} log cache for service {}", super.logCache.getClass(), this.serviceId()); + } } catch (IOException exception) { LOGGER.error( "Unable to start process in {} with command line {}", @@ -249,33 +287,6 @@ protected void doStartProcess( } } - protected void initLogHandler() { - super.logCache.addHandler(($, line, stderr) -> { - for (var logTarget : super.logTargets) { - if (logTarget._1().equals(ChannelMessageSender.self().toTarget())) { - // the current target is the node this service is running on, print it directly here - this.eventManager.callEvent(logTarget._2(), new CloudServiceLogEntryEvent( - this.currentServiceInfo, - line, - stderr ? CloudServiceLogEntryEvent.StreamType.STDERR : CloudServiceLogEntryEvent.StreamType.STDOUT)); - } else { - // the listener is listening remotely, send the line to the network component - ChannelMessage.builder() - .target(logTarget._1()) - .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) - .message("screen_new_line") - .buffer(DataBuf.empty() - .writeObject(this.currentServiceInfo) - .writeString(logTarget._2()) - .writeString(line) - .writeBoolean(stderr)) - .build() - .send(); - } - } - }); - } - protected @Nullable Tuple2 prepareWrapperFile() { // check if the wrapper file is there - unpack it if not if (Files.notExists(WRAPPER_TEMP_FILE)) { diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/factory/JVMLocalCloudServiceFactory.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/factory/JVMLocalCloudServiceFactory.java index 46d4012155..bae41d84aa 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/factory/JVMLocalCloudServiceFactory.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/factory/JVMLocalCloudServiceFactory.java @@ -23,6 +23,7 @@ import eu.cloudnetservice.node.config.Configuration; import eu.cloudnetservice.node.impl.service.InternalCloudServiceManager; import eu.cloudnetservice.node.impl.service.defaults.JVMService; +import eu.cloudnetservice.node.impl.service.defaults.log.ProcessServiceLogReadScheduler; import eu.cloudnetservice.node.impl.tick.DefaultTickLoop; import eu.cloudnetservice.node.impl.version.ServiceVersionProvider; import eu.cloudnetservice.node.service.CloudService; @@ -38,6 +39,7 @@ public class JVMLocalCloudServiceFactory extends BaseLocalCloudServiceFactory { protected final DefaultTickLoop mainThread; protected final EventManager eventManager; protected final CloudServiceManager cloudServiceManager; + protected final ProcessServiceLogReadScheduler processLogReadScheduler; @Inject public JVMLocalCloudServiceFactory( @@ -46,13 +48,15 @@ public JVMLocalCloudServiceFactory( @NonNull Configuration nodeConfig, @NonNull CloudServiceManager cloudServiceManager, @NonNull EventManager eventManager, - @NonNull ServiceVersionProvider versionProvider + @NonNull ServiceVersionProvider versionProvider, + @NonNull ProcessServiceLogReadScheduler processLogReadScheduler ) { super(nodeConfig, versionProvider); this.i18n = i18n; this.mainThread = tickLoop; this.eventManager = eventManager; this.cloudServiceManager = cloudServiceManager; + this.processLogReadScheduler = processLogReadScheduler; } @Override @@ -73,7 +77,8 @@ public JVMLocalCloudServiceFactory( (InternalCloudServiceManager) manager, this.eventManager, this.versionProvider, - preparer); + preparer, + this.processLogReadScheduler); } @Override diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/AbstractServiceLogCache.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/AbstractServiceLogCache.java index 6e71f57f70..fed89f6127 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/AbstractServiceLogCache.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/AbstractServiceLogCache.java @@ -17,8 +17,8 @@ package eu.cloudnetservice.node.impl.service.defaults.log; import com.google.common.base.Preconditions; +import eu.cloudnetservice.driver.service.ServiceId; import eu.cloudnetservice.node.config.Configuration; -import eu.cloudnetservice.node.service.CloudService; import eu.cloudnetservice.node.service.ServiceConsoleLineHandler; import eu.cloudnetservice.node.service.ServiceConsoleLogCache; import java.util.Collection; @@ -36,7 +36,7 @@ public abstract class AbstractServiceLogCache implements ServiceConsoleLogCache protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractServiceLogCache.class); - protected final CloudService service; + protected final ServiceId associatedServiceId; protected final Queue cachedLogMessages = new ConcurrentLinkedQueue<>(); protected final Set handlers = ConcurrentHashMap.newKeySet(); @@ -44,15 +44,15 @@ public abstract class AbstractServiceLogCache implements ServiceConsoleLogCache protected volatile int logCacheSize; protected volatile boolean alwaysPrintErrorStreamToConsole; - public AbstractServiceLogCache(@NonNull Configuration configuration, @NonNull CloudService service) { - this.service = service; + public AbstractServiceLogCache(@NonNull Configuration configuration, @NonNull ServiceId associatedServiceId) { + this.associatedServiceId = associatedServiceId; this.logCacheSize = configuration.maxServiceConsoleLogCacheSize(); this.alwaysPrintErrorStreamToConsole = configuration.printErrorStreamLinesFromServices(); } @Override - public @NonNull CloudService service() { - return this.service; + public @NonNull ServiceId associatedServiceId() { + return this.associatedServiceId; } @Override @@ -97,17 +97,26 @@ public void removeHandler(@NonNull ServiceConsoleLineHandler handler) { } protected void handleItem(@NonNull String entry, boolean comesFromErrorStream) { - // drain the cache - while (this.cachedLogMessages.size() > this.logCacheSize) { - this.cachedLogMessages.poll(); + // empty log lines could be used for some kind of formatting, but are not really + // not useful in any way usually, therefore we don't cache them at all + if (entry.isBlank()) { + return; } - // print the line to the console if enabled + + // insert the log line into the cache, unless the cache is disabled + // if needed we also remove elements from the cache to stay in the provided size bounds + if (this.logCacheSize > 0) { + while (this.cachedLogMessages.size() > this.logCacheSize) { + this.cachedLogMessages.poll(); + } + + this.cachedLogMessages.add(entry); + } + if (this.alwaysPrintErrorStreamToConsole && comesFromErrorStream) { - LOGGER.warn("[{}/WARN]: {}", this.service.serviceId().name(), entry); + LOGGER.warn("[{}/WARN]: {}", this.associatedServiceId.name(), entry); } - // add the line - this.cachedLogMessages.add(entry); - // call all handlers + if (!this.handlers.isEmpty()) { for (var handler : this.handlers) { handler.handleLine(this, entry, comesFromErrorStream); diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java index a3907c0854..ec1cc1b2e3 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java @@ -16,70 +16,110 @@ package eu.cloudnetservice.node.impl.service.defaults.log; +import com.google.common.base.Preconditions; +import eu.cloudnetservice.common.util.StringUtil; +import eu.cloudnetservice.driver.service.ServiceId; import eu.cloudnetservice.node.config.Configuration; -import eu.cloudnetservice.node.service.CloudService; -import eu.cloudnetservice.node.service.ServiceConsoleLogCache; +import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.function.Supplier; import lombok.NonNull; public class ProcessServiceLogCache extends AbstractServiceLogCache { - protected final Supplier processSupplier; + private final ProcessServiceLogReadScheduler scheduler; - protected final byte[] buffer = new byte[2048]; - protected final StringBuffer stringBuffer = new StringBuffer(); + private volatile ProcessHandle targetProcess; + private BufferedReader outStreamReader; + private BufferedReader errStreamReader; public ProcessServiceLogCache( - @NonNull Supplier processSupplier, @NonNull Configuration configuration, - @NonNull CloudService service + @NonNull ServiceId associatedServiceId, + @NonNull ProcessServiceLogReadScheduler scheduler ) { - super(configuration, service); - this.processSupplier = processSupplier; + super(configuration, associatedServiceId); + this.scheduler = scheduler; } - @Override - public @NonNull ServiceConsoleLogCache update() { - // check if we can currently update - var process = this.processSupplier.get(); - if (process != null) { - try { - this.readStream(process.getInputStream(), false); - this.readStream(process.getErrorStream(), true); - } catch (IOException exception) { - LOGGER.error("Exception updating content of console for service {}", - this.service.serviceId().name(), - exception); - // reset the string buffer - this.stringBuffer.setLength(0); + public void start(@NonNull Process process) { + Preconditions.checkState(this.targetProcess == null); + this.targetProcess = process.toHandle(); + this.outStreamReader = process.inputReader(StandardCharsets.UTF_8); + this.errStreamReader = process.errorReader(StandardCharsets.UTF_8); + this.scheduler.schedule(this); + } + + public void stop() { + try { + var outReader = this.outStreamReader; + var errReader = this.errStreamReader; + if (outReader != null && errReader != null) { + outReader.close(); + errReader.close(); + this.outStreamReader = null; + this.errStreamReader = null; } + + // no longer targeting a process, always reset the target process + // in case something went wrong elsewhere to allow re-using this + // log cache in that case anyway + this.targetProcess = null; + } catch (IOException exception) { + LOGGER.error("Failed to close process streams of service {}", this.associatedServiceId.name(), exception); } - // for chaining - return this; } - protected void readStream(@NonNull InputStream stream, boolean isErrorStream) throws IOException { - int len; - while (stream.available() > 0 && (len = stream.read(this.buffer, 0, this.buffer.length)) != -1) { - this.stringBuffer.append(new String(this.buffer, 0, len, StandardCharsets.UTF_8)); - } + public boolean readProcessOutputContent() { + try { + var outReader = this.outStreamReader; + var errReader = this.errStreamReader; + if (outReader == null || errReader == null) { + return false; + } + + // try to read all lines from both stream if content is available + // these calls do not block in case the readers have no content + // available yet + this.readLinesFromStream(outReader, false); + this.readLinesFromStream(errReader, true); - // check if we got a result we can work with - var content = this.stringBuffer.toString(); - if (content.contains("\n") || content.contains("\r")) { - for (var input : content.split("\r")) { - for (var text : input.split("\n")) { - if (!text.trim().isEmpty()) { - this.handleItem(text, isErrorStream); - } - } + // check if the target process terminated, we can stop reading + // the data streams in that case + // the data that was buffered is now removed from the reader and + // no now data will become available if the process is dead + var targetProcess = this.targetProcess; + if (targetProcess == null || !targetProcess.isAlive()) { + this.stop(); // call stop to ensure that the termination is properly handled (prevent state mismatch) + return false; + } + + return true; + } catch (IOException exception) { + // stream close and read can happen concurrently, so in case the stream + // closed we don't want to log the exception but rather signal that the + // service was stopped. "stream closed" is the message for both the reader + // being closed and the file descriptor being no longer available (process terminated) + var message = StringUtil.toLower(exception.getMessage()); + if (message != null && message.equals("stream closed")) { + this.stop(); // call stop to ensure that the termination is properly handled (prevent state mismatch) + LOGGER.debug("Encountered closed out/err stream for service {}, stopping", associatedServiceId); + return false; + } else { + LOGGER.error("Unable to read out/err stream of service {}", this.associatedServiceId, exception); + return true; // couldn't read this time, but maybe we can read next time? } } + } - // reset the string buffer - this.stringBuffer.setLength(0); + private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStream) throws IOException { + while (stream.ready()) { + var line = stream.readLine(); + if (line == null) { + break; + } + + this.handleItem(line, errStream); + } } } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogReadScheduler.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogReadScheduler.java new file mode 100644 index 0000000000..bbfd097544 --- /dev/null +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogReadScheduler.java @@ -0,0 +1,91 @@ +/* + * Copyright 2019-2024 CloudNetService team & contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.cloudnetservice.node.impl.service.defaults.log; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import jakarta.inject.Singleton; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.NonNull; + +@Singleton +public final class ProcessServiceLogReadScheduler { + + private static final int LOG_READ_DELAY_MS = Integer.getInteger("cloudnet.process-log-read-delay", 25); + private static final int READ_WORKER_MAXIMUM = Integer.getInteger("cloudnet.process-log-worker-maximum", 25); + private static final int READ_ACTIONS_PER_WORKER = Integer.getInteger("cloudnet.process-log-actions-per-worker", 5); + + private final AtomicInteger runningReaderActions; + private final ScheduledThreadPoolExecutor executor; + + public ProcessServiceLogReadScheduler() { + var threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.NORM_PRIORITY) + .setNameFormat("process-log-reader-%d") + .build(); + this.executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy()); + this.runningReaderActions = new AtomicInteger(0); + } + + public void schedule(@NonNull ProcessServiceLogCache logCache) { + var runningReaderActions = this.runningReaderActions.getAndIncrement(); + if (runningReaderActions != 0 && runningReaderActions % READ_ACTIONS_PER_WORKER == 0) { + var expectedWorkerCount = (runningReaderActions / READ_ACTIONS_PER_WORKER) + 1; + this.adjustWorkerCount(expectedWorkerCount); + } + + var readTask = new ProcessServiceLogReadTask(logCache, this); + this.executor.scheduleWithFixedDelay(readTask, 0, LOG_READ_DELAY_MS, TimeUnit.MILLISECONDS); + } + + private void notifyLogCacheReadEnd() { + var runningReaderActions = this.runningReaderActions.decrementAndGet(); + if (runningReaderActions != 0 && runningReaderActions % READ_ACTIONS_PER_WORKER == 0) { + var expectedWorkerCount = runningReaderActions / READ_ACTIONS_PER_WORKER; + this.adjustWorkerCount(expectedWorkerCount); + } + } + + private void adjustWorkerCount(int expectedWorkerCount) { + var newCorePoolSize = Math.min(expectedWorkerCount, READ_WORKER_MAXIMUM); + if (this.executor.getCorePoolSize() != newCorePoolSize) { + this.executor.setCorePoolSize(expectedWorkerCount); + } + } + + private record ProcessServiceLogReadTask( + @NonNull ProcessServiceLogCache logCache, + @NonNull ProcessServiceLogReadScheduler scheduler + ) implements Runnable { + + private static final RuntimeException CANCEL_EXCEPTION = new RuntimeException("cancelled, reached stream EOF"); + + @Override + public void run() { + // read the content from the stream, in case the stream closed notify the + // scheduler about this and stop scheduling the next by throwing an exception + var streamsStillOpen = this.logCache.readProcessOutputContent(); + if (!streamsStillOpen) { + this.scheduler.notifyLogCacheReadEnd(); + throw CANCEL_EXCEPTION; + } + } + } +} From 547d34a422947126c4fee8ec24cf811c68281d87 Mon Sep 17 00:00:00 2001 From: Lorenz Wrobel <43410952+DasBabyPixel@users.noreply.github.com> Date: Thu, 3 Oct 2024 01:11:26 +0200 Subject: [PATCH 2/3] non blocking output reader --- .../defaults/log/ProcessServiceLogCache.java | 74 +++++++++++++++++-- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java index ec1cc1b2e3..849ee52d62 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/log/ProcessServiceLogCache.java @@ -17,21 +17,23 @@ package eu.cloudnetservice.node.impl.service.defaults.log; import com.google.common.base.Preconditions; -import eu.cloudnetservice.common.util.StringUtil; import eu.cloudnetservice.driver.service.ServiceId; import eu.cloudnetservice.node.config.Configuration; +import eu.cloudnetservice.utils.base.StringUtil; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import lombok.NonNull; +import org.jetbrains.annotations.Nullable; public class ProcessServiceLogCache extends AbstractServiceLogCache { private final ProcessServiceLogReadScheduler scheduler; private volatile ProcessHandle targetProcess; - private BufferedReader outStreamReader; - private BufferedReader errStreamReader; + private NonBlockingReader outStreamReader; + private NonBlockingReader errStreamReader; public ProcessServiceLogCache( @NonNull Configuration configuration, @@ -45,8 +47,8 @@ public ProcessServiceLogCache( public void start(@NonNull Process process) { Preconditions.checkState(this.targetProcess == null); this.targetProcess = process.toHandle(); - this.outStreamReader = process.inputReader(StandardCharsets.UTF_8); - this.errStreamReader = process.errorReader(StandardCharsets.UTF_8); + this.outStreamReader = new NonBlockingReader(process.inputReader(StandardCharsets.UTF_8)); + this.errStreamReader = new NonBlockingReader(process.errorReader(StandardCharsets.UTF_8)); this.scheduler.schedule(this); } @@ -55,6 +57,12 @@ public void stop() { var outReader = this.outStreamReader; var errReader = this.errStreamReader; if (outReader != null && errReader != null) { + outReader.shutdown = true; + errReader.shutdown = true; + // Processes killed by the termination timeout could have remaining content in the output streams. + // Read all remaining content and then close the streams + this.readLinesFromStream(outReader, false); + this.readLinesFromStream(errReader, true); outReader.close(); errReader.close(); this.outStreamReader = null; @@ -112,8 +120,8 @@ public boolean readProcessOutputContent() { } } - private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStream) throws IOException { - while (stream.ready()) { + private void readLinesFromStream(@NonNull NonBlockingReader stream, boolean errStream) throws IOException { + while (true) { var line = stream.readLine(); if (line == null) { break; @@ -122,4 +130,56 @@ private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStre this.handleItem(line, errStream); } } + + private static class NonBlockingReader { + + private final BufferedReader reader; + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + private boolean shutdown = false; + private boolean lastCR = false; + + public NonBlockingReader(BufferedReader reader) { + this.reader = reader; + } + + /** + * Tries to read a line, if no line is available returns null + */ + private synchronized @Nullable String readLine() throws IOException { + // Strategy: Collect everything into the buffer until we hit a newline + // Then return the line excluding the (CR and) BR character(s) + while (this.reader.ready()) { + var read = this.reader.read(); + if (this.lastCR) { + // Make sure to skip CR when creating the line + if (read != '\n') { + this.buffer.write('\r'); + } + this.lastCR = false; + } else if (read == '\r') { + this.lastCR = true; + // Don't append \r to buffer + continue; + } + + if (read == '\n') { + // finished newline + var line = this.buffer.toString(StandardCharsets.UTF_8); + this.buffer.reset(); + return line; + } + this.buffer.write(read); + } + if (this.shutdown && this.buffer.size() > 0) { + var line = this.buffer.toString(StandardCharsets.UTF_8); + this.buffer.reset(); + return line; + } + return null; + } + + private void close() throws IOException { + this.reader.close(); + } + } } From ae9e900f688c1ddd7762616550910bb9020b2873 Mon Sep 17 00:00:00 2001 From: Lorenz Wrobel <43410952+DasBabyPixel@users.noreply.github.com> Date: Mon, 20 Jan 2025 16:26:30 +0100 Subject: [PATCH 3/3] fix formatting --- .../cloudnetservice/modules/docker/impl/DockerizedService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java b/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java index 7bf28eea9a..87e9494445 100644 --- a/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java +++ b/modules/dockerized-services/impl/src/main/java/eu/cloudnetservice/modules/docker/impl/DockerizedService.java @@ -116,7 +116,8 @@ protected DockerizedService( configuration, manager, eventManager, - logCache, versionProvider, + logCache, + versionProvider, serviceConfigurationPreparer); this.dockerClient = dockerClient;