Refactor ProcessRunner to remove Apache Commons Exec dependency and improve process management

This commit is contained in:
DerDavidBohl
2025-12-10 12:50:04 +01:00
parent f06dbb99e5
commit 44d14fe8e4
2 changed files with 110 additions and 92 deletions

View File

@@ -85,11 +85,6 @@
<artifactId>docker-java-transport-httpclient5</artifactId> <artifactId>docker-java-transport-httpclient5</artifactId>
<version>3.7.0</version> <version>3.7.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -1,10 +1,10 @@
package org.davidbohl.dirigent.utility.process; package org.davidbohl.dirigent.utility.process;
import java.io.ByteArrayOutputStream; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -13,13 +13,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecuteResultHandler;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.ShutdownHookProcessDestroyer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
@@ -29,8 +22,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class ProcessRunner { public class ProcessRunner {
// Track active result handlers to ensure cleanup // Track active processes to ensure cleanup
private final Map<DefaultExecuteResultHandler, Long> activeHandlers = new ConcurrentHashMap<>(); private final Map<Process, Long> activeProcesses = new ConcurrentHashMap<>();
// Background thread to clean up any stale processes // Background thread to clean up any stale processes
private final ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> { private final ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
@@ -41,14 +34,14 @@ public class ProcessRunner {
public ProcessRunner() { public ProcessRunner() {
// Start periodic cleanup task to catch any missed processes // Start periodic cleanup task to catch any missed processes
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleHandlers, 30, 30, TimeUnit.SECONDS); cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleProcesses, 10, 10, TimeUnit.SECONDS);
log.info("ProcessRunner initialized with background cleanup thread"); log.info("ProcessRunner initialized with background cleanup thread");
} }
@PreDestroy @PreDestroy
public void shutdown() { public void shutdown() {
log.info("Shutting down ProcessRunner and cleaning up remaining processes"); log.info("Shutting down ProcessRunner and cleaning up {} remaining processes", activeProcesses.size());
cleanupStaleHandlers(); cleanupStaleProcesses();
cleanupExecutor.shutdown(); cleanupExecutor.shutdown();
try { try {
if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -60,20 +53,35 @@ public class ProcessRunner {
} }
} }
private void cleanupStaleHandlers() { private void cleanupStaleProcesses() {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
activeHandlers.entrySet().removeIf(entry -> { activeProcesses.entrySet().removeIf(entry -> {
DefaultExecuteResultHandler handler = entry.getKey(); Process process = entry.getKey();
long startTime = entry.getValue(); long startTime = entry.getValue();
// If process has been running for more than 2 minutes, consider it stale // Check if process is still alive
if (handler.hasResult() || (now - startTime) > 120000) { if (!process.isAlive()) {
log.debug("Cleaning up stale process handler (hasResult: {}, age: {}s)", log.debug("Removing dead process from tracking (age: {}s)", (now - startTime) / 1000);
handler.hasResult(), (now - startTime) / 1000); return true;
}
// If process has been running for more than 2 minutes, kill it
if ((now - startTime) > 120000) {
log.warn("Killing stale process (age: {}s)", (now - startTime) / 1000);
process.destroyForcibly();
try {
process.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true; return true;
} }
return false; return false;
}); });
if (!activeProcesses.isEmpty()) {
log.debug("Active processes being tracked: {}", activeProcesses.size());
}
} }
public ProcessResult executeCommand(List<String> commandParts, long timeoutMs, Map<String, String> env) public ProcessResult executeCommand(List<String> commandParts, long timeoutMs, Map<String, String> env)
@@ -107,91 +115,106 @@ public class ProcessRunner {
Map<String, String> finalEnv = new HashMap<>(); Map<String, String> finalEnv = new HashMap<>();
finalEnv.putAll(System.getenv()); finalEnv.putAll(System.getenv());
if(env != null && env.size() > 0) if(env != null && !env.isEmpty())
finalEnv.putAll(env); finalEnv.putAll(env);
CommandLine command = new CommandLine(commandParts.get(0)); ProcessBuilder processBuilder = new ProcessBuilder(commandParts);
for (int i = 1; i < commandParts.size(); i++) { processBuilder.directory(workingDirectory);
command.addArgument(commandParts.get(i)); processBuilder.environment().putAll(finalEnv);
} processBuilder.redirectErrorStream(false);
ByteArrayOutputStream stdout = new ByteArrayOutputStream(); log.debug("Running command <{}> in directory {}", String.join(" ", commandParts), workingDirectory);
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr);
ExecuteWatchdog watchdog = ExecuteWatchdog.builder()
.setTimeout(Duration.ofMinutes(1))
.get();
Process process = null;
int exitCode = 1; int exitCode = 1;
String stdoutString = ""; StringBuilder stdout = new StringBuilder();
String stderrString = ""; StringBuilder stderr = new StringBuilder();
Executor executor = DefaultExecutor.builder().get();
executor.setStreamHandler(streamHandler);
executor.setWorkingDirectory(workingDirectory);
executor.setWatchdog(watchdog);
executor.setExitValue(1);
executor.setProcessDestroyer(new ShutdownHookProcessDestroyer());
DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler();
// Track this handler for cleanup
activeHandlers.put(resultHandler, System.currentTimeMillis());
log.debug("Running command <{}>", String.join(" ", commandParts));
try { try {
executor.execute(command, finalEnv, resultHandler); process = processBuilder.start();
try {
resultHandler.waitFor(Duration.ofMinutes(1));
} catch (InterruptedException e) {
log.warn("Process got interrupted", e);
Thread.currentThread().interrupt();
// Ensure process is killed on interrupt
watchdog.destroyProcess();
}
// Only destroy if the process is still running (hasn't completed naturally) // Track this process immediately
if (!resultHandler.hasResult()) { activeProcesses.put(process, System.currentTimeMillis());
log.debug("Process timeout - destroying process");
watchdog.destroyProcess(); // Read stdout in separate thread
Thread stdoutReader = new Thread(() -> {
// Give it a moment to terminate, then force if needed try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
stdout.append(line).append("\n");
}
} catch (IOException e) {
log.debug("Error reading stdout", e);
}
}, "stdout-reader");
stdoutReader.setDaemon(true);
stdoutReader.start();
// Read stderr in separate thread
Thread stderrReader = new Thread(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
stderr.append(line).append("\n");
}
} catch (IOException e) {
log.debug("Error reading stderr", e);
}
}, "stderr-reader");
stderrReader.setDaemon(true);
stderrReader.start();
// Wait for process with timeout
boolean finished = process.waitFor(1, TimeUnit.MINUTES);
if (!finished) {
log.warn("Process timed out, killing it: {}", String.join(" ", commandParts));
process.destroyForcibly();
process.waitFor(5, TimeUnit.SECONDS);
}
// Wait for reader threads to finish
stdoutReader.join(2000);
stderrReader.join(2000);
exitCode = process.exitValue();
} catch (InterruptedException e) {
log.warn("Process got interrupted: {}", String.join(" ", commandParts), e);
if (process != null && process.isAlive()) {
process.destroyForcibly();
try { try {
resultHandler.waitFor(Duration.ofSeconds(2)); process.waitFor(5, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
Thread.currentThread().interrupt();
exitCode = resultHandler.getExitValue();
} finally { } finally {
// Remove from active tracking // CRITICAL: Always remove from tracking and ensure process is dead
activeHandlers.remove(resultHandler); if (process != null) {
activeProcesses.remove(process);
// Ensure streams are closed
streamHandler.stop(); // Double-check the process is actually dead
if (process.isAlive()) {
try { log.warn("Process still alive in finally block, force killing");
stdout.close(); process.destroyForcibly();
} catch (IOException ignored) { try {
} process.waitFor(5, TimeUnit.SECONDS);
try { } catch (InterruptedException e) {
stderr.close(); Thread.currentThread().interrupt();
} catch (IOException ignored) { }
}
} }
} }
stdoutString = stdout.toString(StandardCharsets.UTF_8); String stdoutString = stdout.toString().trim();
stderrString = stderr.toString(StandardCharsets.UTF_8); String stderrString = stderr.toString().trim();
log.debug("Finished command <{}>\nExit code: <{}>\nstdout: {}\nstderr: {}", log.debug("Finished command <{}>\nExit code: <{}>\nstdout: {}\nstderr: {}",
String.join(" ", commandParts), exitCode, stdoutString, stderrString); String.join(" ", commandParts), exitCode, stdoutString, stderrString);
log.debug("Process killed: {}", watchdog.killedProcess());
return new ProcessResult(exitCode, stdoutString, stderrString); return new ProcessResult(exitCode, stdoutString, stderrString);
} }