diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD index 46a9b5f5caaabe..b9fced4d796ed0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD @@ -19,6 +19,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/protobuf:remote_execution_log_java_proto", "//third_party:flogger", + "//third_party:guava", "//third_party:jsr305", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java index c6738942b2d783..8cffbaee9a4caa 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java @@ -74,6 +74,8 @@ public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock) return new ReadHandler(); // } else if (method == ByteStreamGrpc.getWriteMethod()) { return new WriteHandler(); // + } else if (method == ByteStreamGrpc.getQueryWriteStatusMethod()) { + return new QueryWriteStatusHandler(); // } else if (method == CapabilitiesGrpc.getGetCapabilitiesMethod()) { return new GetCapabilitiesHandler(); // } diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java new file mode 100644 index 00000000000000..26ec94f023a2c8 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java @@ -0,0 +1,41 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.logging; + +import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; + +/** LoggingHandler for {@link google.bytestream.QueryWriteStatus} gRPC call. */ +public class QueryWriteStatusHandler + implements LoggingHandler { + private final QueryWriteStatusDetails.Builder builder = QueryWriteStatusDetails.newBuilder(); + + @Override + public void handleReq(QueryWriteStatusRequest message) { + builder.setRequest(message); + } + + @Override + public void handleResp(QueryWriteStatusResponse message) { + builder.setResponse(message); + } + + @Override + public RpcCallDetails getDetails() { + return RpcCallDetails.newBuilder().setQueryWriteStatus(builder).build(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java index 67e9e378e2ff3e..b5062957eaee20 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java @@ -16,24 +16,40 @@ import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; +import com.google.common.collect.Iterables; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails; +import java.util.ArrayList; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; /** LoggingHandler for {@link google.bytestream.Write} gRPC call. */ public class WriteHandler implements LoggingHandler { private final WriteDetails.Builder builder = WriteDetails.newBuilder(); private final Set resources = new LinkedHashSet<>(); + private final List offsets = new ArrayList<>(); + private final List finishWrites = new ArrayList<>(); + private long bytesSentInSequence = 0; private long numWrites = 0; private long bytesSent = 0; @Override public void handleReq(WriteRequest message) { resources.add(message.getResourceName()); + long writeOffset = message.getWriteOffset(); + if (numWrites == 0 || Iterables.getLast(offsets) + bytesSentInSequence != writeOffset) { + offsets.add(writeOffset); + bytesSentInSequence = 0; + } + int size = message.getData().size(); + if (message.getFinishWrite()) { + finishWrites.add(writeOffset + size); + } numWrites++; - bytesSent += message.getData().size(); + bytesSent += size; + bytesSentInSequence += size; } @Override @@ -44,6 +60,8 @@ public void handleResp(WriteResponse message) { @Override public RpcCallDetails getDetails() { builder.addAllResourceNames(resources); + builder.addAllOffsets(offsets); + builder.addAllFinishWrites(finishWrites); builder.setNumWrites(numWrites); builder.setBytesSent(bytesSent); return RpcCallDetails.newBuilder().setWrite(builder).build(); diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto index 84a07d64c8e230..b3f265aaca959d 100644 --- a/src/main/protobuf/remote_execution_log.proto +++ b/src/main/protobuf/remote_execution_log.proto @@ -16,11 +16,11 @@ syntax = "proto3"; package remote_logging; -import "build/bazel/remote/execution/v2/remote_execution.proto"; -import "google/protobuf/timestamp.proto"; import "google/bytestream/bytestream.proto"; import "google/longrunning/operations.proto"; +import "google/protobuf/timestamp.proto"; import "google/rpc/status.proto"; +import "build/bazel/remote/execution/v2/remote_execution.proto"; option java_package = "com.google.devtools.build.lib.remote.logging"; @@ -83,7 +83,7 @@ message GetActionResultDetails { // Details for a call to // build.bazel.remote.execution.v2.ActionCache.UpdateActionResult. message UpdateActionResultDetails { - // The build.bazel.remote.execution.v2.GetActionResultRequest sent by + // The build.bazel.remote.execution.v2.UpdateActionResultRequest sent by // the call. build.bazel.remote.execution.v2.UpdateActionResultRequest request = 1; @@ -128,10 +128,24 @@ message ReadDetails { message WriteDetails { // The names of resources requested to be written to in this call in the order // they were first requested in. If the ByteStream protocol is followed - // according to specification, this should only contain have a single element, - // which is the resource name specified in the first message of the stream. + // according to specification, this should contain at most two elements: + // The resource name specified in the first message of the stream, and an + // empty string specified in each successive request if num_writes > 1. repeated string resource_names = 1; + // The offsets sent for the initial request and any non-sequential offsets + // specified over the course of the call. If the ByteStream protocol is + // followed according to specification, this should contain a single element + // which is the starting point for the write call. + repeated int64 offsets = 5; + + // The effective final size for each request sent with finish_write true + // specified over the course of the call. If the ByteStream protocol is + // followed according to specification, this should contain a single element + // which is the total size of the written resource, including the initial + // offset. + repeated int64 finish_writes = 6; + // The number of writes performed in this call. int64 num_writes = 2; @@ -142,6 +156,15 @@ message WriteDetails { google.bytestream.WriteResponse response = 4; } +// Details for a call to google.bytestream.QueryWriteStatus. +message QueryWriteStatusDetails { + // The google.bytestream.QueryWriteStatusRequest sent by the call. + google.bytestream.QueryWriteStatusRequest request = 1; + + // The received google.bytestream.QueryWriteStatusResponse. + google.bytestream.QueryWriteStatusResponse response = 2; +} + // Contains details for specific types of calls. message RpcCallDetails { reserved 1 to 4, 11; @@ -152,6 +175,7 @@ message RpcCallDetails { FindMissingBlobsDetails find_missing_blobs = 10; ReadDetails read = 5; WriteDetails write = 6; + QueryWriteStatusDetails query_write_status = 14; GetCapabilitiesDetails get_capabilities = 12; UpdateActionResultDetails update_action_result = 13; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java index 17ee021c73a839..3b1ca9a09ef0ac 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java @@ -49,6 +49,8 @@ import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub; import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; import com.google.bytestream.ByteStreamProto.ReadRequest; import com.google.bytestream.ByteStreamProto.ReadResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; @@ -58,6 +60,7 @@ import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails; @@ -854,6 +857,10 @@ public void onCompleted() { WriteDetails.newBuilder() .addResourceNames("test1") .addResourceNames("test2") + .addOffsets(0) + .addOffsets(0) + .addOffsets(0) + // finish write is empty .setResponse(response) .setBytesSent(9) .setNumWrites(3))) @@ -865,6 +872,81 @@ public void onCompleted() { verify(logStream).write(expectedEntry); } + @Test + public void testWriteCallOffsetAndFinishWriteCompounding() { + WriteRequest request1 = + WriteRequest.newBuilder() + .setResourceName("test1") + .setData(ByteString.copyFromUtf8("abc")) + .setWriteOffset(10) + .build(); + WriteRequest request2 = + WriteRequest.newBuilder() + .setData(ByteString.copyFromUtf8("def")) + .setWriteOffset(request1.getWriteOffset() + request1.getData().size()) + .build(); + WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build(); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() { + streamObserver.onNext(response); + streamObserver.onCompleted(); + } + }; + } + }); + ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel); + @SuppressWarnings("unchecked") + StreamObserver responseObserver = Mockito.mock(StreamObserver.class); + + clock.advanceMillis(10000); + // Request three writes, the first identical with the third, but offset correctly and + // finish_writing + StreamObserver requester = stub.write(responseObserver); + requester.onNext(request1); + clock.advanceMillis(100); + requester.onNext(request2); + clock.advanceMillis(200); + requester.onNext( + request1.toBuilder() + .setWriteOffset(request2.getWriteOffset() + request2.getData().size()) + .setFinishWrite(true) + .build()); + clock.advanceMillis(100); + requester.onCompleted(); + + LogEntry expectedEntry = + LogEntry.newBuilder() + .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName()) + .setDetails( + RpcCallDetails.newBuilder() + .setWrite( + WriteDetails.newBuilder() + .addResourceNames("test1") + .addResourceNames("") + .addOffsets(request1.getWriteOffset()) + .addFinishWrites( + 10 + request1.getData().size() * 2 + request2.getData().size()) + .setResponse(response) + .setBytesSent(9) + .setNumWrites(3))) + .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(10)) + .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000)) + .build(); + verify(logStream).write(expectedEntry); + } + @Test public void testWriteCallFail() { WriteRequest request = @@ -881,7 +963,6 @@ public StreamObserver write(StreamObserver streamOb return Mockito.mock(StreamObserver.class); } }); - ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel); @SuppressWarnings("unchecked") StreamObserver responseObserver = Mockito.mock(StreamObserver.class); @@ -894,7 +975,6 @@ public StreamObserver write(StreamObserver streamOb requester.onError(error.asRuntimeException()); Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException()); - LogEntry expectedEntry = LogEntry.newBuilder() .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName()) @@ -907,12 +987,50 @@ public StreamObserver write(StreamObserver streamOb .setWrite( WriteDetails.newBuilder() .addResourceNames("test") + .addOffsets(0) .setNumWrites(1) .setBytesSent(3))) .setStartTime(Timestamp.newBuilder().setSeconds(10000000)) .setEndTime(Timestamp.newBuilder().setSeconds(20000000)) .build(); + verify(logStream).write(expectedEntry); + } + @Test + public void testQueryWriteStatusCallOk() { + QueryWriteStatusRequest request = + QueryWriteStatusRequest.newBuilder().setResourceName("test").build(); + QueryWriteStatusResponse response = + QueryWriteStatusResponse.newBuilder().setCommittedSize(10).build(); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, + StreamObserver responseObserver) { + clock.advanceMillis(22222); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }); + ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(loggedChannel); + + clock.advanceMillis(11111); + stub.queryWriteStatus(request); + + LogEntry expectedEntry = + LogEntry.newBuilder() + .setMethodName(ByteStreamGrpc.getQueryWriteStatusMethod().getFullMethodName()) + .setDetails( + RpcCallDetails.newBuilder() + .setQueryWriteStatus( + QueryWriteStatusDetails.newBuilder() + .setRequest(request) + .setResponse(response))) + .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000)) + .build(); verify(logStream).write(expectedEntry); } }