From c4e22b9ace07f5d360c5327a38f9ae4ab24b7109 Mon Sep 17 00:00:00 2001 From: steinman Date: Fri, 4 Dec 2020 15:08:19 -0800 Subject: [PATCH] Migrate ExampleWorker to use WorkRequestHandler. This means that we're using a real worker implementation in our example and integration tests. It also includes a JSON WorkRequestHandler, but it isn't used except in the tests. RELNOTES: None. PiperOrigin-RevId: 345765354 --- .../google/devtools/build/lib/worker/BUILD | 4 + .../worker/JsonWorkerMessageProcessor.java} | 74 +++--- .../build/lib/worker/WorkRequestHandler.java | 2 +- .../java/com/google/devtools/build/lib/BUILD | 1 + .../build/lib/worker/ExampleWorker.java | 211 ++++++++++-------- .../ProtoExampleWorkerProtocolImpl.java | 50 ----- 6 files changed, 164 insertions(+), 178 deletions(-) rename src/{test/java/com/google/devtools/build/lib/worker/JsonExampleWorkerProtocolImpl.java => main/java/com/google/devtools/build/lib/worker/JsonWorkerMessageProcessor.java} (63%) delete mode 100644 src/test/java/com/google/devtools/build/lib/worker/ProtoExampleWorkerProtocolImpl.java diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD index 037ce6c8504384..1080013342f595 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/BUILD +++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD @@ -51,11 +51,15 @@ java_library( java_library( name = "work_request_handlers", srcs = [ + "JsonWorkerMessageProcessor.java", "ProtoWorkerMessageProcessor.java", "WorkRequestHandler.java", ], deps = [ "//src/main/protobuf:worker_protocol_java_proto", + "//third_party:gson", "//third_party:guava", + "//third_party/protobuf:protobuf_java", + "//third_party/protobuf:protobuf_java_util", ], ) diff --git a/src/test/java/com/google/devtools/build/lib/worker/JsonExampleWorkerProtocolImpl.java b/src/main/java/com/google/devtools/build/lib/worker/JsonWorkerMessageProcessor.java similarity index 63% rename from src/test/java/com/google/devtools/build/lib/worker/JsonExampleWorkerProtocolImpl.java rename to src/main/java/com/google/devtools/build/lib/worker/JsonWorkerMessageProcessor.java index bde92cc1ccbbd5..d15ab345c197a2 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/JsonExampleWorkerProtocolImpl.java +++ b/src/main/java/com/google/devtools/build/lib/worker/JsonWorkerMessageProcessor.java @@ -13,8 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.worker; -import static java.nio.charset.StandardCharsets.UTF_8; - +import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.worker.WorkerProtocol.Input; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; @@ -23,43 +22,42 @@ import com.google.protobuf.ByteString; import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat.Printer; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.util.ArrayList; import java.util.List; -/** Sample implementation of the Worker Protocol using JSON to communicate with Bazel. */ -public class JsonExampleWorkerProtocolImpl implements ExampleWorkerProtocol { - private final Printer jsonPrinter = - JsonFormat.printer().omittingInsignificantWhitespace().includingDefaultValueFields(); +/** Implementation of the Worker Protocol using JSON to communicate with Bazel. */ +public final class JsonWorkerMessageProcessor implements WorkRequestHandler.WorkerMessageProcessor { + /** Reader for reading the WorkResponse. */ private final JsonReader reader; + /** Printer for printing the WorkRequest. */ + private final Printer jsonPrinter; + /** Writer for writing the WorkRequest to the worker. */ private final BufferedWriter jsonWriter; - public JsonExampleWorkerProtocolImpl(InputStream stdin, OutputStream stdout) { - reader = new JsonReader(new BufferedReader(new InputStreamReader(stdin, UTF_8))); + /** Constructs a {@code WorkRequestHandler} that reads and writes JSON. */ + public JsonWorkerMessageProcessor(JsonReader reader, BufferedWriter jsonWriter) { + this.reader = reader; reader.setLenient(true); - jsonWriter = new BufferedWriter(new OutputStreamWriter(stdout, UTF_8)); + this.jsonWriter = jsonWriter; + jsonPrinter = + JsonFormat.printer().omittingInsignificantWhitespace().includingDefaultValueFields(); } - private static ArrayList readArguments(JsonReader reader) throws IOException { + private static ImmutableList readArguments(JsonReader reader) throws IOException { reader.beginArray(); - ArrayList arguments = new ArrayList<>(); + ImmutableList.Builder argumentsBuilder = ImmutableList.builder(); while (reader.hasNext()) { - arguments.add(reader.nextString()); + argumentsBuilder.add(reader.nextString()); } reader.endArray(); - return arguments; + return argumentsBuilder.build(); } - private static ArrayList readInputs(JsonReader reader) throws IOException { + private static ImmutableList readInputs(JsonReader reader) throws IOException { reader.beginArray(); - ArrayList inputs = new ArrayList<>(); + ImmutableList.Builder inputsBuilder = ImmutableList.builder(); while (reader.hasNext()) { String digest = null; String path = null; @@ -81,19 +79,25 @@ private static ArrayList readInputs(JsonReader reader) throws IOException path = reader.nextString(); break; default: - throw new IOException(name + " is an incorrect field in input"); + continue; } } reader.endObject(); - inputs.add( - Input.newBuilder().setDigest(ByteString.copyFromUtf8(digest)).setPath(path).build()); + Input.Builder inputBuilder = Input.newBuilder(); + if (digest != null) { + inputBuilder.setDigest(ByteString.copyFromUtf8(digest)); + } + if (path != null) { + inputBuilder.setPath(path); + } + inputsBuilder.add(inputBuilder.build()); } reader.endArray(); - return inputs; + return inputsBuilder.build(); } @Override - public WorkRequest readRequest() throws IOException { + public WorkRequest readWorkRequest() throws IOException { List arguments = null; List inputs = null; Integer requestId = null; @@ -104,24 +108,24 @@ public WorkRequest readRequest() throws IOException { switch (name) { case "arguments": if (arguments != null) { - throw new IOException("Work request cannot have more than one list of arguments"); + throw new IOException("WorkRequest cannot have more than one 'arguments' field"); } arguments = readArguments(reader); break; case "inputs": if (inputs != null) { - throw new IOException("Work request cannot have more than one list of inputs"); + throw new IOException("WorkRequest cannot have more than one 'inputs' field"); } inputs = readInputs(reader); break; case "requestId": if (requestId != null) { - throw new IOException("Work request cannot have more than one requestId"); + throw new IOException("WorkRequest cannot have more than one requestId"); } requestId = reader.nextInt(); break; default: - throw new IOException(name + " is an incorrect field in work request"); + break; } } reader.endObject(); @@ -143,17 +147,13 @@ public WorkRequest readRequest() throws IOException { } @Override - public void writeResponse(WorkResponse response) throws IOException { + public void writeWorkResponse(WorkResponse response) throws IOException { jsonPrinter.appendTo(response, jsonWriter); jsonWriter.flush(); } @Override - public void close() { - try { - jsonWriter.close(); - } catch (IOException e) { - System.err.printf("Could not close json writer. %s", e); - } + public void close() throws IOException { + jsonWriter.close(); } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java index 3690d61e82598a..7da37742d48a15 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java @@ -56,7 +56,7 @@ public interface WorkerMessageProcessor { /** This worker's stderr. */ private final PrintStream stderr; - private final WorkerMessageProcessor messageProcessor; + final WorkerMessageProcessor messageProcessor; private final CpuTimeBasedGcScheduler gcScheduler; diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index 3c1ae59f02bd1d..ee2490c2d7e53c 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -605,6 +605,7 @@ java_library( ], deps = [ "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", + "//src/main/java/com/google/devtools/build/lib/worker:work_request_handlers", "//src/main/java/com/google/devtools/common/options", "//src/main/protobuf:worker_protocol_java_proto", "//third_party:gson", diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java index 471218c060dbda..9800f19ab3debc 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java @@ -15,18 +15,25 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.base.Ascii; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.actions.ExecutionRequirements.WorkerProtocolFormat; import com.google.devtools.build.lib.worker.ExampleWorkerOptions.ExampleWorkOptions; import com.google.devtools.build.lib.worker.WorkerProtocol.Input; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; -import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; import com.google.devtools.common.options.OptionsParser; +import com.google.gson.stream.JsonReader; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.PrintStream; +import java.io.PrintWriter; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -35,11 +42,12 @@ import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.function.BiFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; /** An example implementation of a worker process that is used for integration tests. */ -public class ExampleWorker { +public final class ExampleWorker { static final Pattern FLAG_FILE_PATTERN = Pattern.compile("(?:@|--?flagfile=)(.+)"); @@ -52,12 +60,51 @@ public class ExampleWorker { // If true, returns corrupt responses instead of correct protobufs. static boolean poisoned = false; - // Keep state across multiple builds. static final LinkedHashMap inputs = new LinkedHashMap<>(); // Contains the request currently being worked on. private static WorkRequest currentRequest; + // The options passed to this worker on a per-worker-lifetime basis. + static ExampleWorkerOptions workerOptions; + + private static class InterruptableWorkRequestHandler extends WorkRequestHandler { + + InterruptableWorkRequestHandler( + BiFunction, PrintWriter, Integer> callback, + PrintStream stderr, + WorkerMessageProcessor messageProcessor) { + super(callback, stderr, messageProcessor); + } + + @Override + public void processRequests() throws IOException { + while (true) { + WorkRequest request = messageProcessor.readWorkRequest(); + if (request == null) { + break; + } + currentRequest = request; + inputs.clear(); + for (Input input : request.getInputsList()) { + inputs.put(input.getPath(), input.getDigest().toStringUtf8()); + } + if (poisoned && workerOptions.hardPoison) { + throw new IllegalStateException("I'm a very poisoned worker and will just crash."); + } + if (request.getRequestId() != 0) { + Thread t = createResponseThread(request); + t.start(); + } else { + respondToRequest(request); + } + if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) { + System.exit(0); + } + } + } + } + public static void main(String[] args) throws Exception { if (ImmutableSet.copyOf(args).contains("--persistent_worker")) { OptionsParser parser = @@ -66,107 +113,89 @@ public static void main(String[] args) throws Exception { .allowResidue(false) .build(); parser.parse(args); - ExampleWorkerOptions workerOptions = parser.getOptions(ExampleWorkerOptions.class); - Preconditions.checkState(workerOptions.persistentWorker); - runPersistentWorker(workerOptions); + workerOptions = parser.getOptions(ExampleWorkerOptions.class); + WorkerProtocolFormat protocolFormat = workerOptions.workerProtocol; + WorkRequestHandler.WorkerMessageProcessor messageProcessor = null; + switch (protocolFormat) { + case JSON: + messageProcessor = + new JsonWorkerMessageProcessor( + new JsonReader(new BufferedReader(new InputStreamReader(System.in, UTF_8))), + new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))); + break; + case PROTO: + messageProcessor = new ProtoWorkerMessageProcessor(System.in, System.out); + break; + } + Preconditions.checkNotNull(messageProcessor); + WorkRequestHandler workRequestHandler = + new InterruptableWorkRequestHandler(ExampleWorker::doWork, System.err, messageProcessor); + workRequestHandler.processRequests(); + } else { // This is a single invocation of the example that exits after it processed the request. - processRequest(ImmutableList.copyOf(args)); + parseOptionsAndLog(ImmutableList.copyOf(args)); } } - private static void runPersistentWorker(ExampleWorkerOptions workerOptions) throws IOException { + private static int doWork(List args, PrintWriter err) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream originalStdOut = System.out; PrintStream originalStdErr = System.err; - ExampleWorkerProtocol workerProtocol = null; - switch (workerOptions.workerProtocol) { - case JSON: - workerProtocol = new JsonExampleWorkerProtocolImpl(System.in, System.out); - break; - case PROTO: - workerProtocol = new ProtoExampleWorkerProtocolImpl(System.in, System.out); - } - Preconditions.checkNotNull(workerProtocol); - while (true) { - try { - WorkRequest request = workerProtocol.readRequest(); - if (request == null) { - break; - } - - currentRequest = request; - - inputs.clear(); - for (Input input : request.getInputsList()) { - inputs.put(input.getPath(), input.getDigest().toStringUtf8()); - } - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - int exitCode = 0; - - try (PrintStream ps = new PrintStream(baos)) { - System.setOut(ps); - System.setErr(ps); - - if (poisoned) { - if (workerOptions.hardPoison) { - throw new IllegalStateException("I'm a very poisoned worker and will just crash."); - } - System.out.println("I'm a poisoned worker and this is not a protobuf."); - System.out.println("Here's a fake stack trace for you:"); - System.out.println(" at com.example.Something(Something.java:83)"); - System.out.println(" at java.lang.Thread.run(Thread.java:745)"); - System.out.print("And now, 8k of random bytes: "); - byte[] b = new byte[8192]; - new Random().nextBytes(b); - System.out.write(b); - } else { - try { - processRequest(request.getArgumentsList()); - } catch (Exception e) { - e.printStackTrace(); - exitCode = 1; - } - } - } finally { - System.setOut(originalStdOut); - System.setErr(originalStdErr); - currentRequest = null; - } - - if (workerOptions.exitDuring > 0 && workUnitCounter > workerOptions.exitDuring) { - return; + try (PrintStream ps = new PrintStream(baos)) { + System.setOut(ps); + System.setErr(ps); + if (poisoned) { + System.out.println("I'm a poisoned worker and this is not a protobuf."); + System.out.println("Here's a fake stack trace for you:"); + System.out.println(" at com.example.Something(Something.java:83)"); + System.out.println(" at java.lang.Thread.run(Thread.java:745)"); + System.out.print("And now, 8k of random bytes: "); + byte[] b = new byte[8192]; + new Random().nextBytes(b); + try { + System.out.write(b); + } catch (IOException e) { + e.printStackTrace(); + return 1; } - - if (poisoned) { - baos.writeTo(System.out); - System.out.flush(); - } else { - WorkResponse response = - WorkResponse.newBuilder() - .setOutput(baos.toString()) - .setExitCode(exitCode) - .setRequestId(request.getRequestId()) - .build(); - workerProtocol.writeResponse(response); + } else { + try { + parseOptionsAndLog(args); + } catch (Exception e) { + e.printStackTrace(); + return 1; } + } + } finally { + System.setOut(originalStdOut); + System.setErr(originalStdErr); + currentRequest = null; + } - if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) { - return; - } + if (workerOptions.exitDuring > 0 && workUnitCounter > workerOptions.exitDuring) { + System.exit(0); + } - if (workerOptions.poisonAfter > 0 && workUnitCounter > workerOptions.poisonAfter) { - poisoned = true; - } - } finally { - // Be a good worker process and consume less memory when idle. - System.gc(); + if (poisoned) { + try { + baos.writeTo(System.out); + System.out.flush(); + System.exit(1); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); } } + if (workerOptions.poisonAfter > 0 && workUnitCounter > workerOptions.poisonAfter) { + poisoned = true; + } + return 0; } - private static void processRequest(List args) throws Exception { + private static void parseOptionsAndLog(List args) throws Exception { ImmutableList.Builder expandedArgs = ImmutableList.builder(); for (String arg : args) { Matcher flagFileMatcher = FLAG_FILE_PATTERN.matcher(arg); @@ -185,7 +214,7 @@ private static void processRequest(List args) throws Exception { List outputs = new ArrayList<>(); if (options.writeUUID) { - outputs.add("UUID " + workerUuid.toString()); + outputs.add("UUID " + workerUuid); } if (options.writeCounter) { @@ -194,7 +223,7 @@ private static void processRequest(List args) throws Exception { String residueStr = Joiner.on(' ').join(parser.getResidue()); if (options.uppercase) { - residueStr = residueStr.toUpperCase(); + residueStr = Ascii.toUpperCase(residueStr); } outputs.add(residueStr); @@ -223,4 +252,6 @@ private static void processRequest(List args) throws Exception { } } } + + private ExampleWorker() {} } diff --git a/src/test/java/com/google/devtools/build/lib/worker/ProtoExampleWorkerProtocolImpl.java b/src/test/java/com/google/devtools/build/lib/worker/ProtoExampleWorkerProtocolImpl.java deleted file mode 100644 index e0ee1cbfaf63cc..00000000000000 --- a/src/test/java/com/google/devtools/build/lib/worker/ProtoExampleWorkerProtocolImpl.java +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2020 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.worker; - -import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; -import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** Sample implementation of the Worker Protocol using Proto to communiocate with Bazel. */ -public class ProtoExampleWorkerProtocolImpl implements ExampleWorkerProtocol { - private final InputStream stdin; - private final OutputStream stdout; - - public ProtoExampleWorkerProtocolImpl(InputStream stdin, OutputStream stdout) { - this.stdin = stdin; - this.stdout = stdout; - } - - @Override - public WorkRequest readRequest() throws IOException { - try { - return WorkRequest.parseDelimitedFrom(stdin); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - } - - @Override - public void writeResponse(WorkResponse response) throws IOException { - response.writeDelimitedTo(stdout); - stdout.flush(); - } - - @Override - public void close() {} -}