From 186decab01ee247c7453baf19dac778545ec4937 Mon Sep 17 00:00:00 2001 From: larsrc Date: Fri, 9 Apr 2021 02:57:02 -0700 Subject: [PATCH] Interface and flag specification for worker cancellation. See design doc https://docs.google.com/document/d/1-h4gcBV8Jn6DK9G_e23kZQ159jmX__uckhub1Gv9dzc/edit Also fixes the JSON protocol processor to ignore unknown fields, as per spec. RELNOTES: Updates worker protocol with cancellation fields, and adds experimental_worker_cancellation flag to control cancellation. PiperOrigin-RevId: 367600226 --- .../lib/actions/ExecutionRequirements.java | 2 ++ .../devtools/build/lib/actions/Spawns.java | 5 ++++ .../build/lib/worker/JsonWorkerProtocol.java | 6 ++-- .../build/lib/worker/WorkerOptions.java | 8 +++++ src/main/protobuf/worker_protocol.proto | 30 +++++++++++++++---- .../devtools/build/lib/worker/WorkerTest.java | 14 +++++++-- 6 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java index 7c8f959fdc8677..d6b593b1301764 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java @@ -160,6 +160,8 @@ public String parseIfMatches(String tag) throws ValidationException { /** Specify the type of worker protocol the worker uses. */ public static final String REQUIRES_WORKER_PROTOCOL = "requires-worker-protocol"; + public static final String SUPPORTS_WORKER_CANCELLATION = "supports-worker-cancellation"; + /** Denotes what the type of worker protocol the worker uses. */ public enum WorkerProtocolFormat { JSON, diff --git a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java index 06e4b613497bc7..640bde6899938c 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java +++ b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java @@ -92,6 +92,11 @@ public static boolean supportsMultiplexWorkers(Spawn spawn) { .equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_MULTIPLEX_WORKERS)); } + public static boolean supportsWorkerCancellation(Spawn spawn) { + return "1" + .equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKER_CANCELLATION)); + } + /** * Returns which worker protocol format a Spawn claims a persistent worker uses. Defaults to proto * if the protocol format is not specified. diff --git a/src/main/java/com/google/devtools/build/lib/worker/JsonWorkerProtocol.java b/src/main/java/com/google/devtools/build/lib/worker/JsonWorkerProtocol.java index 8c594515eedd26..2c24073b6241d0 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/JsonWorkerProtocol.java +++ b/src/main/java/com/google/devtools/build/lib/worker/JsonWorkerProtocol.java @@ -92,10 +92,12 @@ private WorkResponse parseResponse() throws IOException { requestId = reader.nextInt(); break; default: - throw new IOException(name + " is an incorrect field in work response"); + // As per https://docs.bazel.build/versions/master/creating-workers.html#work-responses, + // unknown fields are ignored. + reader.skipValue(); } } - reader.endObject(); + reader.endObject(); } catch (MalformedJsonException | EOFException | IllegalStateException e) { throw new IOException("Could not parse json work request correctly", e); } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java index dc07ca87e6ca72..dcc21983b55bde 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java @@ -178,4 +178,12 @@ public String getTypeDescription() { "Currently a no-op. Future: If enabled, workers that support the experimental" + " multiplexing feature will use that feature.") public boolean workerMultiplex; + + @Option( + name = "experimental_worker_cancellation", + defaultValue = "false", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.EXECUTION}, + help = "If enabled, Bazel may send cancellation requests to workers that support them.") + public boolean workerCancellation; } diff --git a/src/main/protobuf/worker_protocol.proto b/src/main/protobuf/worker_protocol.proto index c628b7eb7a4169..a5f95453be2d82 100644 --- a/src/main/protobuf/worker_protocol.proto +++ b/src/main/protobuf/worker_protocol.proto @@ -39,9 +39,19 @@ message WorkRequest { // request. repeated Input inputs = 2; - // To support multiplex worker, each WorkRequest must have an unique ID. This - // ID should be attached unchanged to the WorkResponse. + // Each WorkRequest must have either a unique + // request_id or request_id = 0. If request_id is 0, this WorkRequest must be + // processed alone, otherwise the worker may process multiple WorkRequests in + // parallel (multiplexing). As an exception to the above, if the cancel field + // is true, the request_id must be the same as a previously sent WorkRequest. + // The request_id must be attached unchanged to the corresponding + // WorkResponse. int32 request_id = 3; + + // EXPERIMENTAL: When true, this is a cancel request, indicating that a + // previously sent WorkRequest with the same request_id should be cancelled. + // The arguments and inputs fields must be empty and should be ignored. + bool cancel = 4; } // The worker sends this message to Blaze when it finished its work on the @@ -54,9 +64,17 @@ message WorkResponse { // string type here, which gives us UTF-8 encoding. string output = 2; - // To support multiplex worker, each WorkResponse must have an unique ID. - // Since worker processes which support multiplex worker will handle multiple - // WorkRequests in parallel, this ID will be used to determined which - // WorkerProxy does this WorkResponse belong to. + // This field must be set to the same request_id as the WorkRequest it is a + // response to. Since worker processes which support multiplex worker will + // handle multiple WorkRequests in parallel, this ID will be used to + // determined which WorkerProxy does this WorkResponse belong to. int32 request_id = 3; + + // EXPERIMENTAL When true, indicates that this response was sent due to + // receiving a cancel request. The exit_code and output fields should be empty + // and will be ignored. Exactly one WorkResponse must be sent for each + // non-cancelling WorkRequest received by the worker, but if the worker + // received a cancel request, it doesn't matter if it replies with a regular + // WorkResponse or with one where was_cancelled = true. + bool was_cancelled = 4; } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java index a7f22586712e97..e67aa06429b678 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java @@ -199,8 +199,16 @@ public void testGetResponse_json_multipleRequestId_fails() throws IOException { } @Test - public void testGetResponse_json_incorrectFields_fails() throws IOException { - verifyGetResponseFailure( - "{\"testField\":0}", "testField is an incorrect field in work response"); + public void testGetResponse_json_unknownFieldsIgnored() throws IOException, InterruptedException { + TestWorker testWorker = + createTestWorker( + "{\"exitCode\":1,\"output\":\"test output\",\"requestId\":1,\"unknown\":{1:['a']}}" + .getBytes(UTF_8), + JSON); + WorkResponse readResponse = testWorker.getResponse(1); + WorkResponse response = + WorkResponse.newBuilder().setExitCode(1).setOutput("test output").setRequestId(1).build(); + + assertThat(readResponse).isEqualTo(response); } }