Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

non blocking output reader #1521

Draft
wants to merge 3 commits into
base: nightly
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public class DockerizedService extends JVMService {

protected final DockerClient dockerClient;
protected final DockerConfiguration configuration;
protected final DockerizedServiceLogCache logCache;

protected volatile String containerId;

Expand All @@ -109,21 +108,20 @@ protected DockerizedService(
@NonNull DockerClient dockerClient,
@NonNull DockerConfiguration dockerConfiguration
) {
var logCache = new DockerizedServiceLogCache(nodeConfig, configuration.serviceId());
super(
i18n,
tickLoop,
nodeConfig,
configuration,
manager,
eventManager,
logCache,
versionProvider,
serviceConfigurationPreparer);

this.dockerClient = dockerClient;
this.configuration = dockerConfiguration;

super.logCache = this.logCache = new DockerizedServiceLogCache(nodeConfig, this);
this.initLogHandler();
}

@Override
Expand Down Expand Up @@ -379,8 +377,8 @@ protected boolean needsImagePull(@NonNull DockerImage image) {
public final class ServiceLogCacheAdapter extends ResultCallback.Adapter<Frame> {

@Override
public void onNext(Frame object) {
DockerizedService.this.logCache.handle(object);
public void onNext(@NonNull Frame object) {
((DockerizedServiceLogCache) DockerizedService.this.logCache).handle(object);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@
package eu.cloudnetservice.modules.docker.impl;

import com.github.dockerjava.api.model.Frame;
import eu.cloudnetservice.driver.service.ServiceId;
import eu.cloudnetservice.node.config.Configuration;
import eu.cloudnetservice.node.impl.service.defaults.log.AbstractServiceLogCache;
import eu.cloudnetservice.node.service.CloudService;
import eu.cloudnetservice.node.service.ServiceConsoleLogCache;
import java.nio.charset.StandardCharsets;
import lombok.NonNull;

public class DockerizedServiceLogCache extends AbstractServiceLogCache {

public DockerizedServiceLogCache(@NonNull Configuration configuration, @NonNull CloudService service) {
super(configuration, service);
}

@Override
public @NonNull ServiceConsoleLogCache update() {
return this;
public DockerizedServiceLogCache(@NonNull Configuration configuration, @NonNull ServiceId associatedServiceId) {
super(configuration, associatedServiceId);
}

public void handle(@NonNull Frame frame) {
Expand All @@ -49,9 +43,7 @@ protected void handleItem(@NonNull String content, boolean comesFromErrorStream)
if (content.contains("\n") || content.contains("\r")) {
for (var input : content.split("\r")) {
for (var text : input.split("\n")) {
if (!text.trim().isEmpty()) {
super.handleItem(text, comesFromErrorStream);
}
super.handleItem(text, comesFromErrorStream);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

package eu.cloudnetservice.node.service;

import eu.cloudnetservice.driver.service.ServiceId;
import java.util.Collection;
import java.util.Queue;
import lombok.NonNull;
import org.jetbrains.annotations.UnmodifiableView;

public interface ServiceConsoleLogCache {

@NonNull Queue<String> cachedLogMessages();

@NonNull ServiceConsoleLogCache update();
@NonNull
Queue<String> cachedLogMessages();

@NonNull CloudService service();
@NonNull
ServiceId associatedServiceId();

int logCacheSize();

Expand All @@ -42,5 +43,6 @@ public interface ServiceConsoleLogCache {
void removeHandler(@NonNull ServiceConsoleLineHandler handler);

@NonNull
@UnmodifiableView Collection<ServiceConsoleLineHandler> handlers();
@UnmodifiableView
Collection<ServiceConsoleLineHandler> handlers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import eu.cloudnetservice.driver.channel.ChannelMessageTarget;
import eu.cloudnetservice.driver.document.Document;
import eu.cloudnetservice.driver.event.EventManager;
import eu.cloudnetservice.driver.event.events.service.CloudServiceLogEntryEvent;
import eu.cloudnetservice.driver.impl.network.NetworkConstants;
import eu.cloudnetservice.driver.language.I18n;
import eu.cloudnetservice.driver.network.HostAndPort;
Expand Down Expand Up @@ -105,6 +106,7 @@ public abstract class AbstractService implements InternalCloudService {
protected final DefaultTickLoop mainThread;
protected final EventManager eventManager;
protected final Configuration configuration;
protected final ServiceConsoleLogCache logCache;
protected final InternalCloudServiceManager cloudServiceManager;
protected final ServiceConfiguration serviceConfiguration;
protected final ServiceVersionProvider serviceVersionProvider;
Expand All @@ -121,8 +123,6 @@ public abstract class AbstractService implements InternalCloudService {
protected final Collection<ServiceRemoteInclusion> installedInclusions = ConcurrentHashMap.newKeySet();
protected final Collection<ServiceDeployment> installedDeployments = ConcurrentHashMap.newKeySet();

protected ServiceConsoleLogCache logCache;

protected volatile NetworkChannel networkChannel;
protected volatile long connectionTimestamp = -1;

Expand All @@ -136,10 +136,12 @@ protected AbstractService(
@NonNull ServiceConfiguration configuration,
@NonNull InternalCloudServiceManager manager,
@NonNull EventManager eventManager,
@NonNull ServiceConsoleLogCache logCache,
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer
) {
this.i18n = i18n;
this.logCache = logCache;
this.mainThread = tickLoop;
this.configuration = nodeConfig;
this.eventManager = eventManager;
Expand All @@ -162,6 +164,7 @@ protected AbstractService(
ServiceLifeCycle.PREPARED,
configuration.propertyHolder().immutableCopy());
this.pushServiceInfoSnapshotUpdate(ServiceLifeCycle.PREPARED, false);
this.initStandardServiceLogHandler();

// register the service locally for now
manager.registerUnacceptedService(this);
Expand Down Expand Up @@ -884,6 +887,33 @@ protected void downloadInclusionFile(@NonNull ServiceRemoteInclusion inclusion,
this.serviceId().nodeUniqueId()};
}

protected void initStandardServiceLogHandler() {
this.logCache.addHandler((_, line, stderr) -> {
for (var logTarget : this.logTargets) {
if (logTarget._1().equals(ChannelMessageSender.self().toTarget())) {
// the current target is the node this service is running on, print it directly here
this.eventManager.callEvent(logTarget._2(), new CloudServiceLogEntryEvent(
this.currentServiceInfo,
line,
stderr ? CloudServiceLogEntryEvent.StreamType.STDERR : CloudServiceLogEntryEvent.StreamType.STDOUT));
} else {
// the listener is listening remotely, send the line to the network component
ChannelMessage.builder()
.target(logTarget._1())
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.message("screen_new_line")
.buffer(DataBuf.empty()
.writeObject(this.currentServiceInfo)
.writeString(logTarget._2())
.writeString(line)
.writeBoolean(stderr))
.build()
.send();
}
}
});
}

protected abstract void startProcess();

protected abstract void stopProcess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,14 @@ public DefaultCloudServiceManager(
})
.currentGetter(group -> this.serviceProviderByName(group.name()).serviceInfo())
.build());
// schedule the updating of the local service log cache

// schedule the service watchdog to run once per second
mainThread.scheduleTask(() -> {
for (var service : this.localCloudServices()) {
// we only need to look at running services
if (service.lifeCycle() == ServiceLifeCycle.RUNNING) {
// detect dead services and stop them
if (service.alive()) {
service.serviceConsoleLogCache().update();
LOGGER.trace("Updated service log cache of {}", service.serviceId().name());
} else {
eventManager.callEvent(new CloudServicePreForceStopEvent(service));
service.stop();
LOGGER.trace("Stopped dead service {}", service.serviceId().name());
}
if (service.lifeCycle() == ServiceLifeCycle.RUNNING && !service.alive()) {
eventManager.callEvent(new CloudServicePreForceStopEvent(service));
service.stop();
LOGGER.debug("Stopped dead service {}", service.serviceId().name());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import eu.cloudnetservice.driver.channel.ChannelMessage;
import eu.cloudnetservice.driver.channel.ChannelMessageSender;
import eu.cloudnetservice.driver.event.EventManager;
import eu.cloudnetservice.driver.event.events.service.CloudServiceLogEntryEvent;
import eu.cloudnetservice.driver.impl.network.NetworkConstants;
import eu.cloudnetservice.driver.language.I18n;
import eu.cloudnetservice.driver.network.buffer.DataBuf;
import eu.cloudnetservice.driver.service.ServiceConfiguration;
import eu.cloudnetservice.driver.service.ServiceEnvironment;
import eu.cloudnetservice.driver.service.ServiceEnvironmentType;
Expand All @@ -33,9 +28,11 @@
import eu.cloudnetservice.node.event.service.CloudServicePreProcessStartEvent;
import eu.cloudnetservice.node.impl.service.InternalCloudServiceManager;
import eu.cloudnetservice.node.impl.service.defaults.log.ProcessServiceLogCache;
import eu.cloudnetservice.node.impl.service.defaults.log.ProcessServiceLogReadScheduler;
import eu.cloudnetservice.node.impl.tick.DefaultTickLoop;
import eu.cloudnetservice.node.impl.version.ServiceVersionProvider;
import eu.cloudnetservice.node.service.ServiceConfigurationPreparer;
import eu.cloudnetservice.node.service.ServiceConsoleLogCache;
import eu.cloudnetservice.utils.base.StringUtil;
import eu.cloudnetservice.utils.base.io.FileUtil;
import io.vavr.CheckedFunction1;
Expand Down Expand Up @@ -85,6 +82,31 @@ public JVMService(
@NonNull InternalCloudServiceManager manager,
@NonNull EventManager eventManager,
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer,
@NonNull ProcessServiceLogReadScheduler processLogReadScheduler
) {
var logCache = new ProcessServiceLogCache(nodeConfig, configuration.serviceId(), processLogReadScheduler);
this(
i18n,
tickLoop,
nodeConfig,
configuration,
manager,
eventManager,
logCache,
versionProvider,
serviceConfigurationPreparer);
}

protected JVMService(
@NonNull I18n i18n,
@NonNull DefaultTickLoop tickLoop,
@NonNull Configuration nodeConfig,
@NonNull ServiceConfiguration configuration,
@NonNull InternalCloudServiceManager manager,
@NonNull EventManager eventManager,
@NonNull ServiceConsoleLogCache logCache,
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer
) {
super(
Expand All @@ -94,10 +116,9 @@ public JVMService(
configuration,
manager,
eventManager,
logCache,
versionProvider,
serviceConfigurationPreparer);
super.logCache = new ProcessServiceLogCache(() -> this.process, nodeConfig, this);
this.initLogHandler();
}

@Override
Expand Down Expand Up @@ -178,23 +199,33 @@ protected void startProcess() {

@Override
protected void stopProcess() {
if (this.process != null) {
// try to send a shutdown command
var process = this.process;
if (process != null) {
// try to send a shutdown command (still needs the process instance to be present)
this.runCommand("end");
this.runCommand("stop");
this.process = null;

// try to wait for the process to terminate normally, setting the
// terminated flag to true if the process exited in the given time frame
var terminated = false;
try {
// wait until the process termination seconds exceeded
if (this.process.waitFor(this.configuration.processTerminationTimeoutSeconds(), TimeUnit.SECONDS)) {
this.process.exitValue(); // validation that the process terminated
this.process = null; // reset as there is no fall-through
return;
if (process.waitFor(this.configuration.processTerminationTimeoutSeconds(), TimeUnit.SECONDS)) {
process.exitValue(); // validation that the process terminated
terminated = true;
}
} catch (IllegalThreadStateException | InterruptedException ignored) { // force shutdown the process
}
// force destroy the process now - not much we can do here more than that
this.process.toHandle().destroyForcibly();
this.process = null;

// force-destroy the process in case it didn't terminate normally
if (!terminated) {
process.toHandle().destroyForcibly();
}

// stop the reading process when the process exited
if (this.logCache instanceof ProcessServiceLogCache processServiceLogCache) {
processServiceLogCache.stop();
}
}
}

Expand Down Expand Up @@ -240,6 +271,13 @@ protected void doStartProcess(
// start the process and fire the post start event
this.process = builder.start();
this.eventManager.callEvent(new CloudServicePostProcessStartEvent(this));

// start the log reading unless some user code changed the log cache type
// in that case it's up to the user to start the reading process
if (super.logCache instanceof ProcessServiceLogCache processServiceLogCache) {
processServiceLogCache.start(this.process);
LOGGER.debug("Started {} log cache for service {}", super.logCache.getClass(), this.serviceId());
}
} catch (IOException exception) {
LOGGER.error(
"Unable to start process in {} with command line {}",
Expand All @@ -249,33 +287,6 @@ protected void doStartProcess(
}
}

protected void initLogHandler() {
super.logCache.addHandler(($, line, stderr) -> {
for (var logTarget : super.logTargets) {
if (logTarget._1().equals(ChannelMessageSender.self().toTarget())) {
// the current target is the node this service is running on, print it directly here
this.eventManager.callEvent(logTarget._2(), new CloudServiceLogEntryEvent(
this.currentServiceInfo,
line,
stderr ? CloudServiceLogEntryEvent.StreamType.STDERR : CloudServiceLogEntryEvent.StreamType.STDOUT));
} else {
// the listener is listening remotely, send the line to the network component
ChannelMessage.builder()
.target(logTarget._1())
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.message("screen_new_line")
.buffer(DataBuf.empty()
.writeObject(this.currentServiceInfo)
.writeString(logTarget._2())
.writeString(line)
.writeBoolean(stderr))
.build()
.send();
}
}
});
}

protected @Nullable Tuple2<Path, Attributes> prepareWrapperFile() {
// check if the wrapper file is there - unpack it if not
if (Files.notExists(WRAPPER_TEMP_FILE)) {
Expand Down
Loading