diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 1a669477b56bc5..8b7ce76f78b293 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -14,9 +14,9 @@ package com.google.devtools.build.lib.worker; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.flogger.GoogleLogger; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; import com.google.devtools.build.lib.shell.Subprocess; @@ -31,8 +31,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -43,8 +45,14 @@ * into them to send requests. When a worker process returns a {@code WorkResponse}, {@code * WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response. */ -public class WorkerMultiplexer extends Thread { - private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); +public class WorkerMultiplexer { + /** + * A queue of {@link WorkRequest} instances that need to be sent to the worker. {@link + * WorkerProxy} instances add to this queue, while the requestSender subthread remove requests and + * send them to the worker. This prevents dynamic execution interrupts from corrupting the {@code + * stdin} of the worker process. + */ + private final BlockingQueue pendingRequests = new LinkedBlockingQueue<>(); /** * A map of {@code WorkResponse}s received from the worker process. They are stored in this map * keyed by the request id until the corresponding {@code WorkerProxy} picks them up. @@ -80,6 +88,12 @@ public class WorkerMultiplexer extends Thread { /** For testing only, allow a way to fake subprocesses. */ private SubprocessFactory subprocessFactory; + /** A separate thread that sends requests. */ + private Thread requestSender; + + /** A separate thread that receives responses. */ + private Thread responseReceiver; + /** * The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared * at the end of a command execution. @@ -97,16 +111,15 @@ synchronized void setReporter(@Nullable EventHandler reporter) { } /** Reports a string to the user if reporting is enabled. */ - private synchronized void report(String s) { - EventHandler r = this.reporter; // Protect against race condition with setReporter(). - if (r != null && s != null) { - r.handle(Event.info(s)); + private synchronized void report(@Nullable String s) { + if (this.reporter != null && s != null) { + this.reporter.handle(Event.info(s)); } } /** * Creates a worker process corresponding to this {@code WorkerMultiplexer}, if it doesn't already - * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread. + * exist. Also starts up the subthreads handling reading and writing requests and responses. */ public synchronized void createProcess(Path workDir) throws IOException { if (this.process == null) { @@ -129,12 +142,25 @@ public synchronized void createProcess(Path workDir) throws IOException { processBuilder.setStderr(logFile.getPathFile()); processBuilder.setEnv(workerKey.getEnv()); this.process = processBuilder.start(); + String id = workerKey.getMnemonic() + "-" + workerKey.hashCode(); + // TODO(larsrc): Consider moving sender/receiver threads into separate classes. + this.requestSender = + new Thread( + () -> { + while (process.isAlive() && sendRequest()) {} + }); + this.requestSender.setName("multiplexer-request-sender-" + id); + this.requestSender.start(); + this.responseReceiver = + new Thread( + () -> { + while (process.isAlive() && readResponse()) {} + }); + this.responseReceiver.setName("multiplexer-response-receiver-" + id); + this.responseReceiver.start(); } else if (!this.process.isAlive()) { throw new IOException("Process is dead"); } - if (!this.isAlive()) { - this.start(); - } } /** @@ -157,7 +183,10 @@ public synchronized void destroyMultiplexer() { wasDestroyed = true; } - /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */ + /** + * Destroys the worker subprocess. This might block forever if the subprocess refuses to die. It + * is safe to call this multiple times. + */ private synchronized void destroyProcess() { boolean wasInterrupted = false; try { @@ -171,6 +200,17 @@ private synchronized void destroyProcess() { } } } finally { + // Stop the subthreads only when the process is dead, or their loops will go on. + if (this.requestSender != null) { + this.requestSender.interrupt(); + } + if (this.responseReceiver != null) { + this.responseReceiver.interrupt(); + } + // Might as well release any waiting workers + for (Semaphore semaphore : responseChecker.values()) { + semaphore.release(); + } // Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/ if (wasInterrupted) { Thread.currentThread().interrupt(); // preserve interrupted status @@ -183,17 +223,12 @@ private synchronized void destroyProcess() { * WorkerProxy}, and so is subject to interrupts by dynamic execution. */ public synchronized void putRequest(WorkRequest request) throws IOException { - responseChecker.put(request.getRequestId(), new Semaphore(0)); - try { - request.writeDelimitedTo(process.getOutputStream()); - process.getOutputStream().flush(); - } catch (IOException e) { - // We can't know how much of the request was sent, so we have to assume the worker's input - // now contains garbage. - // TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread? - responseChecker.remove(request.getRequestId()); - throw e; + if (!process.isAlive()) { + throw new IOException( + "Attempting to send request " + request.getRequestId() + " to dead process"); } + responseChecker.put(request.getRequestId(), new Semaphore(0)); + pendingRequests.add(request); } /** @@ -203,21 +238,25 @@ public synchronized void putRequest(WorkRequest request) throws IOException { */ public WorkResponse getResponse(Integer requestId) throws InterruptedException { try { + if (!process.isAlive()) { + // If the process has died, all we can do is return what may already have been returned. + return workerProcessResponse.get(requestId); + } + Semaphore waitForResponse = responseChecker.get(requestId); if (waitForResponse == null) { report("Null response semaphore for " + requestId); - // If the multiplexer is interrupted when a {@code WorkerProxy} is trying to send a request, - // the request is not sent, so there is no need to wait for a response. - return null; + // If there is no semaphore for this request, it probably failed to send, so we just return + // what we got, probably nothing. + return workerProcessResponse.get(requestId); } // Wait for the multiplexer to get our response and release this semaphore. The semaphore will // throw {@code InterruptedException} when the multiplexer is terminated. waitForResponse.acquire(); - WorkResponse workResponse = workerProcessResponse.get(requestId); - return workResponse; + return workerProcessResponse.get(requestId); } finally { responseChecker.remove(requestId); workerProcessResponse.remove(requestId); @@ -225,36 +264,73 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { } /** - * Waits to read a {@code WorkResponse} from worker process, put that {@code WorkResponse} in - * {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}. + * Sends a single pending request, if there are any. Blocks until a request is available. * - *

This is only called on the WorkerMultiplexer thread and so cannot be interrupted by dynamic - * execution cancellation. + *

This is only called by the {@code requestSender} thread and so cannot be interrupted by + * dynamic execution cancellation, but only by a call to {@link #destroyProcess()}. */ - private void waitResponse() throws InterruptedException, IOException { - recordingStream = new RecordingInputStream(this.process.getInputStream()); - recordingStream.startRecording(4096); - // TODO(larsrc): Turn this into a loop that also sends requests. - // Allow interrupts while waiting for responses, without conflating it with I/O errors. - while (recordingStream.available() == 0) { - if (!this.process.isAlive()) { - throw new IOException( - String.format("Multiplexer process for %s is dead", workerKey.getMnemonic())); + private boolean sendRequest() { + WorkRequest request; + try { + request = pendingRequests.take(); + } catch (InterruptedException e) { + return false; + } + try { + request.writeDelimitedTo(process.getOutputStream()); + process.getOutputStream().flush(); + } catch (IOException e) { + // We can't know how much of the request was sent, so we have to assume the worker's input + // now contains garbage, and this request is lost. + // TODO(b/177637516): Signal that this action failed for presumably transient reasons. + report("Failed to send request " + request.getRequestId()); + Semaphore s = responseChecker.remove(request.getRequestId()); + if (s != null) { + s.release(); } - Thread.sleep(1); + // TODO(b/177637516): Leave process in a moribound state so pending responses can be returned. + destroyProcess(); + return false; } - WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + return true; + } - // A null parsedResponse can only happen if the input stream is closed, in which case we + /** + * Reads a {@code WorkResponse} from worker process, puts that {@code WorkResponse} in {@code + * workerProcessResponse}, and releases the semaphore for the {@code WorkerProxy}. + * + *

This is only called on the readResponses subthread and so cannot be interrupted by dynamic + * execution cancellation, but only by a call to {@link #destroyProcess()}. + */ + private boolean readResponse() { + recordingStream = new RecordingInputStream(process.getInputStream()); + recordingStream.startRecording(4096); + WorkResponse parsedResponse; + try { + parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + } catch (IOException e) { + if (!(e instanceof InterruptedIOException)) { + report( + String.format( + "Error while reading response from multiplexer process for %s: %s", + workerKey.getMnemonic(), e)); + } + // We can't know how much of the response was read, so we have to assume the worker's output + // now contains garbage, and we can't reliably read any further responses. + destroyProcess(); + return false; + } + // A null parsedResponse can happen if the input stream is closed, in which case we // drop everything. if (parsedResponse == null) { - throw new IOException( + report( String.format( - "Multiplexer process for %s died while reading response", workerKey.getMnemonic())); + "Multiplexer process for %s has closed its output stream", workerKey.getMnemonic())); + destroyProcess(); + return false; } int requestId = parsedResponse.getRequestId(); - workerProcessResponse.put(requestId, parsedResponse); // TODO(b/151767359): When allowing cancellation, just remove responses that have no matching @@ -267,61 +343,7 @@ private void waitResponse() throws InterruptedException, IOException { report(String.format("Multiplexer for %s found no semaphore", workerKey.getMnemonic())); workerProcessResponse.remove(requestId); } - } - - /** The multiplexer thread that listens to the WorkResponse from worker process. */ - @Override - public void run() { - while (this.process.isAlive()) { - try { - waitResponse(); - } catch (IOException e) { - // We got this exception while reading from the worker's stdout. We can't trust the - // output any more at that point. - if (this.process.isAlive()) { - destroyProcess(); - } - if (e instanceof InterruptedIOException) { - report( - String.format( - "Multiplexer process for %s was interrupted during I/O, aborting multiplexer", - workerKey.getMnemonic())); - } else { - // TODO(larsrc): Output the recorded message. - report( - String.format( - "Multiplexer for %s got IOException reading a response, aborting multiplexer", - workerKey.getMnemonic())); - logger.atWarning().withCause(e).log( - "Caught IOException while waiting for worker response. " - + "It could be because the worker returned an unparseable response."); - } - } catch (InterruptedException e) { - // This should only happen when the Blaze build has been aborted (failed or cancelled). In - // that case, there may still be some outstanding requests in the worker process, which we - // will let fall on the floor, but we still want to leave the process running for the next - // build. - // TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented. - for (Semaphore semaphore : responseChecker.values()) { - semaphore.release(); - } - } - } - synchronized (this) { - releaseAllSemaphores(); - } - } - - /** - * Release all the semaphores and clear the related maps. Must only be called when we are shutting - * down the multiplexer. - */ - private void releaseAllSemaphores() { - for (Semaphore semaphore : responseChecker.values()) { - semaphore.release(); - } - responseChecker.clear(); - workerProcessResponse.clear(); + return true; } String getRecordingStreamMessage() { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java index aefabf0a4306ee..0a58cd539f7f81 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java @@ -80,7 +80,6 @@ public static synchronized void removeInstance(WorkerKey key) throws UserExecExc } instanceInfo.decreaseRefCount(); if (instanceInfo.getRefCount() == 0) { - instanceInfo.getWorkerMultiplexer().interrupt(); instanceInfo.getWorkerMultiplexer().destroyMultiplexer(); multiplexerInstance.remove(key); } diff --git a/src/test/shell/integration/bazel_worker_multiplexer_test.sh b/src/test/shell/integration/bazel_worker_multiplexer_test.sh index e04821619ae3e0..e98c46b33bb7aa 100755 --- a/src/test/shell/integration/bazel_worker_multiplexer_test.sh +++ b/src/test/shell/integration/bazel_worker_multiplexer_test.sh @@ -205,10 +205,13 @@ EOF bazel build :hello_world_1 &> $TEST_log \ || fail "build failed" - bazel build :hello_world_2 &> $TEST_log \ - && fail "expected build to failed" || true + bazel build --worker_verbose :hello_world_2 >> $TEST_log 2>&1 \ + && fail "expected build to fail" || true + + error_msgs=$(egrep -o -- 'Worker process (did not return a|returned an unparseable) WorkResponse' "$TEST_log") - expect_log "Worker process quit or closed its stdin stream when we tried to send a WorkRequest" + [ -n "$error_msgs" ] \ + || fail "expected error message not found" } function test_worker_restarts_when_worker_binary_changes() {