From 17a98a47b291b2eef362101a0c8611aa94179bee Mon Sep 17 00:00:00 2001 From: John Cormie Date: Mon, 8 Jul 2024 10:27:07 -0700 Subject: [PATCH 1/7] Establish a default connect timeout. --- .../io/grpc/binder/internal/BinderClientTransportFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java index d28180241e9..1e2b80b2fdb 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java @@ -127,7 +127,7 @@ public static final class Builder implements ClientTransportFactoryBuilder { BindServiceFlags bindServiceFlags = BindServiceFlags.DEFAULTS; InboundParcelablePolicy inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT; OneWayBinderProxy.Decorator binderDecorator = OneWayBinderProxy.IDENTITY_DECORATOR; - long readyTimeoutMillis = -1; // TODO(jdcormie) Set an non-infinite default in a separate PR. + long readyTimeoutMillis = 60_000; @Override public BinderClientTransportFactory buildClientTransportFactory() { @@ -210,7 +210,7 @@ public Builder setBinderDecorator(OneWayBinderProxy.Decorator binderDecorator) { * fail-fast work * as expected despite certain edge cases that could otherwise stall the transport indefinitely. * - *

Optional. Use a negative value to wait indefinitely. + *

Optional but enabled by default. Use a negative value to wait indefinitely. */ public Builder setReadyTimeoutMillis(long readyTimeoutMillis) { this.readyTimeoutMillis = readyTimeoutMillis; From 8af05c9b42a6aff238fa7cb6b3e5e2f1cfdfc8d7 Mon Sep 17 00:00:00 2001 From: John Cormie Date: Thu, 8 Aug 2024 17:03:55 -0700 Subject: [PATCH 2/7] Add a newAttachHeadersServerInterceptor() util. --- .../main/java/io/grpc/stub/MetadataUtils.java | 41 +++++++++ .../java/io/grpc/stub/MetadataUtilsTest.java | 86 +++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index addf54c0f81..28ac0b274db 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -24,8 +24,12 @@ import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.Status; import java.util.concurrent.atomic.AtomicReference; @@ -143,4 +147,41 @@ public void onClose(Status status, Metadata trailers) { } } } + + /** + * Returns a ServerInterceptor that attaches a given set of headers to every response. + * + * @param extraHeaders the headers to be added to each response. Caller gives up ownership. + */ + public static ServerInterceptor newAttachHeadersServerInterceptor(Metadata extraHeaders) { + return new MetadataAttachingServerInterceptor(extraHeaders); + } + + private static final class MetadataAttachingServerInterceptor implements ServerInterceptor { + + private final Metadata extraHeaders; + + MetadataAttachingServerInterceptor(Metadata extraHeaders) { + this.extraHeaders = extraHeaders; + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + return next.startCall(new HeaderAttachingServerCall<>(call), headers); + } + + final class HeaderAttachingServerCall + extends SimpleForwardingServerCall { + HeaderAttachingServerCall(ServerCall delegate) { + super(delegate); + } + + @Override + public void sendHeaders(Metadata headers) { + headers.merge(extraHeaders); + super.sendHeaders(headers); + } + } + } } diff --git a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java new file mode 100644 index 00000000000..b6f259316ae --- /dev/null +++ b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableList; +import io.grpc.CallOptions; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.grpc.StringMarshaller; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MetadataUtilsTest { + + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private static Metadata.Key FOO_KEY = + Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER); + + private final MethodDescriptor echoMethod = + MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE) + .setFullMethodName("test/echo") + .setType(MethodDescriptor.MethodType.UNARY) + .build(); + + private final ServerCallHandler echoCallHandler = + ServerCalls.asyncUnaryCall( + (req, respObserver) -> { + respObserver.onNext(req); + respObserver.onCompleted(); + }); + + @Test + public void testAttachHeadersServerInterceptor() throws IOException { + Metadata extraHeaders = new Metadata(); + extraHeaders.put(FOO_KEY, "foo"); + + ImmutableList interceptors = + ImmutableList.of(MetadataUtils.newAttachHeadersServerInterceptor(extraHeaders)); + ServerServiceDefinition serviceDef = + ServerInterceptors.intercept( + ServerServiceDefinition.builder("test").addMethod(echoMethod, echoCallHandler).build(), + interceptors); + + InProcessServerBuilder server = + InProcessServerBuilder.forName("test").directExecutor().addService(serviceDef); + grpcCleanup.register(server.build().start()); + ManagedChannel channel = InProcessChannelBuilder.forName("test").directExecutor().build(); + AtomicReference trailersCapture = new AtomicReference<>(); + AtomicReference headersCapture = new AtomicReference<>(); + ClientInterceptors.intercept( + channel, MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)); + + String response = ClientCalls.blockingUnaryCall(channel, method, CallOptions.DEFAULT, "hello"); + assertThat(response).isEqualTo("hello"); + } +} From 888fe050f810edf620d20b3ba9acced478dae05b Mon Sep 17 00:00:00 2001 From: John Cormie Date: Thu, 8 Aug 2024 17:10:09 -0700 Subject: [PATCH 3/7] fix test --- .../java/io/grpc/stub/MetadataUtilsTest.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java index b6f259316ae..7c6ac1fc9c5 100644 --- a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java +++ b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableList; import io.grpc.CallOptions; -import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -44,7 +43,7 @@ public class MetadataUtilsTest { @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); private static Metadata.Key FOO_KEY = - Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key.of("foo-key", Metadata.ASCII_STRING_MARSHALLER); private final MethodDescriptor echoMethod = MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE) @@ -62,7 +61,7 @@ public class MetadataUtilsTest { @Test public void testAttachHeadersServerInterceptor() throws IOException { Metadata extraHeaders = new Metadata(); - extraHeaders.put(FOO_KEY, "foo"); + extraHeaders.put(FOO_KEY, "foo-value"); ImmutableList interceptors = ImmutableList.of(MetadataUtils.newAttachHeadersServerInterceptor(extraHeaders)); @@ -74,13 +73,19 @@ public void testAttachHeadersServerInterceptor() throws IOException { InProcessServerBuilder server = InProcessServerBuilder.forName("test").directExecutor().addService(serviceDef); grpcCleanup.register(server.build().start()); - ManagedChannel channel = InProcessChannelBuilder.forName("test").directExecutor().build(); + AtomicReference trailersCapture = new AtomicReference<>(); AtomicReference headersCapture = new AtomicReference<>(); - ClientInterceptors.intercept( - channel, MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)); + ManagedChannel channel = + InProcessChannelBuilder.forName("test") + .directExecutor() + .intercept(MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)) + .build(); - String response = ClientCalls.blockingUnaryCall(channel, method, CallOptions.DEFAULT, "hello"); + String response = + ClientCalls.blockingUnaryCall(channel, echoMethod, CallOptions.DEFAULT, "hello"); assertThat(response).isEqualTo("hello"); + Metadata headers = headersCapture.get(); + assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); } } From 8f66ca12d1c36d57ce49824d536c6ddebd9a9f07 Mon Sep 17 00:00:00 2001 From: John Cormie Date: Mon, 12 Aug 2024 13:35:30 -0700 Subject: [PATCH 4/7] Tag as experimental --- stub/src/main/java/io/grpc/stub/MetadataUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index 28ac0b274db..424af2d6884 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -22,6 +22,7 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ExperimentalApi; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; @@ -153,6 +154,7 @@ public void onClose(Status status, Metadata trailers) { * * @param extraHeaders the headers to be added to each response. Caller gives up ownership. */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11462") public static ServerInterceptor newAttachHeadersServerInterceptor(Metadata extraHeaders) { return new MetadataAttachingServerInterceptor(extraHeaders); } From 10cedf956b03905634cfac328c28553ecf044339 Mon Sep 17 00:00:00 2001 From: John Cormie Date: Mon, 12 Aug 2024 19:20:25 -0700 Subject: [PATCH 5/7] Attach headers even in the onError() case. --- .../main/java/io/grpc/stub/MetadataUtils.java | 15 +++ .../java/io/grpc/stub/MetadataUtilsTest.java | 115 +++++++++++++++--- 2 files changed, 114 insertions(+), 16 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index 424af2d6884..c9fea7152c9 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -175,6 +175,8 @@ public ServerCall.Listener interceptCall( final class HeaderAttachingServerCall extends SimpleForwardingServerCall { + boolean headersSent; + HeaderAttachingServerCall(ServerCall delegate) { super(delegate); } @@ -182,8 +184,21 @@ final class HeaderAttachingServerCall @Override public void sendHeaders(Metadata headers) { headers.merge(extraHeaders); + headersSent = true; super.sendHeaders(headers); } + + @Override + public void close(Status status, Metadata trailers) { + if (!headersSent) { + // It isn't too late to call sendHeaders(): !headersSent implies that it hasn't been + // called yet (obviously). But it also implies that no messages have been sent, because + // sendMessage() *requires* a preceding call to sendHeaders(). + headersSent = true; + super.sendHeaders(extraHeaders); + } + super.close(status, trailers); + } } } } diff --git a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java index 7c6ac1fc9c5..2e75ba3a035 100644 --- a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java +++ b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java @@ -17,6 +17,9 @@ package io.grpc.stub; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.stub.MetadataUtils.newAttachHeadersServerInterceptor; +import static io.grpc.stub.MetadataUtils.newCaptureMetadataInterceptor; +import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; import io.grpc.CallOptions; @@ -24,14 +27,18 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; +import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; import io.grpc.StringMarshaller; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; import java.io.IOException; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; import org.junit.Rule; import org.junit.Test; @@ -42,7 +49,9 @@ public class MetadataUtilsTest { @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - private static Metadata.Key FOO_KEY = + + private static final String SERVER_NAME = "test"; + private static final Metadata.Key FOO_KEY = Metadata.Key.of("foo-key", Metadata.ASCII_STRING_MARSHALLER); private final MethodDescriptor echoMethod = @@ -58,29 +67,31 @@ public class MetadataUtilsTest { respObserver.onCompleted(); }); + MethodDescriptor echoServerStreamingMethod = + MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE) + .setFullMethodName("test/echoStream") + .setType(MethodDescriptor.MethodType.SERVER_STREAMING) + .build(); + + private final AtomicReference trailersCapture = new AtomicReference<>(); + private final AtomicReference headersCapture = new AtomicReference<>(); + @Test - public void testAttachHeadersServerInterceptor() throws IOException { + public void shouldAttachHeadersToResponse() throws IOException { Metadata extraHeaders = new Metadata(); extraHeaders.put(FOO_KEY, "foo-value"); - ImmutableList interceptors = - ImmutableList.of(MetadataUtils.newAttachHeadersServerInterceptor(extraHeaders)); ServerServiceDefinition serviceDef = ServerInterceptors.intercept( ServerServiceDefinition.builder("test").addMethod(echoMethod, echoCallHandler).build(), - interceptors); + ImmutableList.of(newAttachHeadersServerInterceptor(extraHeaders))); - InProcessServerBuilder server = - InProcessServerBuilder.forName("test").directExecutor().addService(serviceDef); - grpcCleanup.register(server.build().start()); - - AtomicReference trailersCapture = new AtomicReference<>(); - AtomicReference headersCapture = new AtomicReference<>(); + grpcCleanup.register(newInProcessServerBuilder().addService(serviceDef).build().start()); ManagedChannel channel = - InProcessChannelBuilder.forName("test") - .directExecutor() - .intercept(MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)) - .build(); + grpcCleanup.register( + newInProcessChannelBuilder() + .intercept(newCaptureMetadataInterceptor(headersCapture, trailersCapture)) + .build()); String response = ClientCalls.blockingUnaryCall(channel, echoMethod, CallOptions.DEFAULT, "hello"); @@ -88,4 +99,76 @@ public void testAttachHeadersServerInterceptor() throws IOException { Metadata headers = headersCapture.get(); assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); } + + @Test + public void shouldAttachHeadersDespiteNoResponse() throws IOException { + Metadata extraHeaders = new Metadata(); + extraHeaders.put(FOO_KEY, "foo-value"); + + ServerServiceDefinition serviceDef = + ServerInterceptors.intercept( + ServerServiceDefinition.builder("test") + .addMethod( + ServerMethodDefinition.create( + echoServerStreamingMethod, + ServerCalls.asyncUnaryCall( + (req, respObserver) -> respObserver.onCompleted()))) + .build(), + ImmutableList.of(newAttachHeadersServerInterceptor(extraHeaders))); + grpcCleanup.register(newInProcessServerBuilder().addService(serviceDef).build().start()); + + ManagedChannel channel = + grpcCleanup.register( + newInProcessChannelBuilder() + .intercept(newCaptureMetadataInterceptor(headersCapture, trailersCapture)) + .build()); + + Iterator response = + ClientCalls.blockingServerStreamingCall( + channel, echoServerStreamingMethod, CallOptions.DEFAULT, "hello"); + assertThat(response.hasNext()).isFalse(); + Metadata headers = headersCapture.get(); + assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); + } + + @Test + public void shouldAttachHeadersToErrorResponse() throws IOException { + Metadata extraHeaders = new Metadata(); + extraHeaders.put(FOO_KEY, "foo-value"); + + ServerServiceDefinition serviceDef = + ServerInterceptors.intercept( + ServerServiceDefinition.builder("test") + .addMethod( + echoMethod, + ServerCalls.asyncUnaryCall( + (req, respObserver) -> + respObserver.onError(Status.INVALID_ARGUMENT.asRuntimeException()))) + .build(), + ImmutableList.of(newAttachHeadersServerInterceptor(extraHeaders))); + grpcCleanup.register(newInProcessServerBuilder().addService(serviceDef).build().start()); + + ManagedChannel channel = + grpcCleanup.register( + newInProcessChannelBuilder() + .intercept(newCaptureMetadataInterceptor(headersCapture, trailersCapture)) + .build()); + try { + ClientCalls.blockingUnaryCall(channel, echoMethod, CallOptions.DEFAULT, "hello"); + fail(); + } catch (StatusRuntimeException e) { + assertThat(e.getStatus()).isNotNull(); + assertThat(e.getStatus().getCode()).isEqualTo(Code.INVALID_ARGUMENT); + } + Metadata headers = headersCapture.get(); + assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); + } + + private static InProcessServerBuilder newInProcessServerBuilder() { + return InProcessServerBuilder.forName(SERVER_NAME).directExecutor(); + } + + private static InProcessChannelBuilder newInProcessChannelBuilder() { + return InProcessChannelBuilder.forName(SERVER_NAME).directExecutor(); + } } From c563cd7d056a77a1e9ddcc870ab4cec206bb2dc9 Mon Sep 17 00:00:00 2001 From: John Cormie Date: Mon, 12 Aug 2024 19:29:53 -0700 Subject: [PATCH 6/7] Don't mutate shared 'extraHeaders' in the no trailers case. --- stub/src/main/java/io/grpc/stub/MetadataUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index c9fea7152c9..ffaade2c7d7 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -194,8 +194,7 @@ public void close(Status status, Metadata trailers) { // It isn't too late to call sendHeaders(): !headersSent implies that it hasn't been // called yet (obviously). But it also implies that no messages have been sent, because // sendMessage() *requires* a preceding call to sendHeaders(). - headersSent = true; - super.sendHeaders(extraHeaders); + sendHeaders(new Metadata()); } super.close(status, trailers); } From a209c8f60de9a0fc618e88f51adf7a1478216be9 Mon Sep 17 00:00:00 2001 From: John Cormie Date: Tue, 13 Aug 2024 19:46:15 -0700 Subject: [PATCH 7/7] Don't spoil retries by forcing headers --- .../main/java/io/grpc/stub/MetadataUtils.java | 37 +++++++++++-------- .../java/io/grpc/stub/MetadataUtilsTest.java | 37 ++++++++++--------- 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index ffaade2c7d7..4208d3ca652 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -150,40 +150,50 @@ public void onClose(Status status, Metadata trailers) { } /** - * Returns a ServerInterceptor that attaches a given set of headers to every response. + * Returns a ServerInterceptor that adds the specified Metadata to every response stream, one way + * or another. * - * @param extraHeaders the headers to be added to each response. Caller gives up ownership. + *

If, absent this interceptor, a stream would have headers, 'extras' will be added to those + * headers. Otherwise, 'extras' will be sent as trailers. This pattern is useful when you have + * some fixed information, server identity say, that should be included no matter how the call + * turns out. The fallback to trailers avoids artificially committing clients to error responses + * that could otherwise be retried (see https://grpc.io/docs/guides/retry/ for more). + * + *

For correct operation, be sure to arrange for this interceptor to run *before* any others + * that might add headers. + * + * @param extras the Metadata to be added to each stream. Caller gives up ownership. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11462") - public static ServerInterceptor newAttachHeadersServerInterceptor(Metadata extraHeaders) { - return new MetadataAttachingServerInterceptor(extraHeaders); + public static ServerInterceptor newAttachMetadataServerInterceptor(Metadata extras) { + return new MetadataAttachingServerInterceptor(extras); } private static final class MetadataAttachingServerInterceptor implements ServerInterceptor { - private final Metadata extraHeaders; + private final Metadata extras; - MetadataAttachingServerInterceptor(Metadata extraHeaders) { - this.extraHeaders = extraHeaders; + MetadataAttachingServerInterceptor(Metadata extras) { + this.extras = extras; } @Override public ServerCall.Listener interceptCall( ServerCall call, Metadata headers, ServerCallHandler next) { - return next.startCall(new HeaderAttachingServerCall<>(call), headers); + return next.startCall(new MetadataAttachingServerCall<>(call), headers); } - final class HeaderAttachingServerCall + final class MetadataAttachingServerCall extends SimpleForwardingServerCall { boolean headersSent; - HeaderAttachingServerCall(ServerCall delegate) { + MetadataAttachingServerCall(ServerCall delegate) { super(delegate); } @Override public void sendHeaders(Metadata headers) { - headers.merge(extraHeaders); + headers.merge(extras); headersSent = true; super.sendHeaders(headers); } @@ -191,10 +201,7 @@ public void sendHeaders(Metadata headers) { @Override public void close(Status status, Metadata trailers) { if (!headersSent) { - // It isn't too late to call sendHeaders(): !headersSent implies that it hasn't been - // called yet (obviously). But it also implies that no messages have been sent, because - // sendMessage() *requires* a preceding call to sendHeaders(). - sendHeaders(new Metadata()); + trailers.merge(extras); } super.close(status, trailers); } diff --git a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java index 2e75ba3a035..f9890ac0433 100644 --- a/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java +++ b/stub/src/test/java/io/grpc/stub/MetadataUtilsTest.java @@ -17,7 +17,7 @@ package io.grpc.stub; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.stub.MetadataUtils.newAttachHeadersServerInterceptor; +import static io.grpc.stub.MetadataUtils.newAttachMetadataServerInterceptor; import static io.grpc.stub.MetadataUtils.newCaptureMetadataInterceptor; import static org.junit.Assert.fail; @@ -78,13 +78,13 @@ public class MetadataUtilsTest { @Test public void shouldAttachHeadersToResponse() throws IOException { - Metadata extraHeaders = new Metadata(); - extraHeaders.put(FOO_KEY, "foo-value"); + Metadata extras = new Metadata(); + extras.put(FOO_KEY, "foo-value"); ServerServiceDefinition serviceDef = ServerInterceptors.intercept( ServerServiceDefinition.builder("test").addMethod(echoMethod, echoCallHandler).build(), - ImmutableList.of(newAttachHeadersServerInterceptor(extraHeaders))); + ImmutableList.of(newAttachMetadataServerInterceptor(extras))); grpcCleanup.register(newInProcessServerBuilder().addService(serviceDef).build().start()); ManagedChannel channel = @@ -96,14 +96,15 @@ public void shouldAttachHeadersToResponse() throws IOException { String response = ClientCalls.blockingUnaryCall(channel, echoMethod, CallOptions.DEFAULT, "hello"); assertThat(response).isEqualTo("hello"); - Metadata headers = headersCapture.get(); - assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); + assertThat(trailersCapture.get() == null || !trailersCapture.get().containsKey(FOO_KEY)) + .isTrue(); + assertThat(headersCapture.get().get(FOO_KEY)).isEqualTo("foo-value"); } @Test - public void shouldAttachHeadersDespiteNoResponse() throws IOException { - Metadata extraHeaders = new Metadata(); - extraHeaders.put(FOO_KEY, "foo-value"); + public void shouldAttachTrailersWhenNoResponse() throws IOException { + Metadata extras = new Metadata(); + extras.put(FOO_KEY, "foo-value"); ServerServiceDefinition serviceDef = ServerInterceptors.intercept( @@ -114,7 +115,7 @@ public void shouldAttachHeadersDespiteNoResponse() throws IOException { ServerCalls.asyncUnaryCall( (req, respObserver) -> respObserver.onCompleted()))) .build(), - ImmutableList.of(newAttachHeadersServerInterceptor(extraHeaders))); + ImmutableList.of(newAttachMetadataServerInterceptor(extras))); grpcCleanup.register(newInProcessServerBuilder().addService(serviceDef).build().start()); ManagedChannel channel = @@ -127,14 +128,14 @@ public void shouldAttachHeadersDespiteNoResponse() throws IOException { ClientCalls.blockingServerStreamingCall( channel, echoServerStreamingMethod, CallOptions.DEFAULT, "hello"); assertThat(response.hasNext()).isFalse(); - Metadata headers = headersCapture.get(); - assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); + assertThat(headersCapture.get() == null || !headersCapture.get().containsKey(FOO_KEY)).isTrue(); + assertThat(trailersCapture.get().get(FOO_KEY)).isEqualTo("foo-value"); } @Test - public void shouldAttachHeadersToErrorResponse() throws IOException { - Metadata extraHeaders = new Metadata(); - extraHeaders.put(FOO_KEY, "foo-value"); + public void shouldAttachTrailersToErrorResponse() throws IOException { + Metadata extras = new Metadata(); + extras.put(FOO_KEY, "foo-value"); ServerServiceDefinition serviceDef = ServerInterceptors.intercept( @@ -145,7 +146,7 @@ public void shouldAttachHeadersToErrorResponse() throws IOException { (req, respObserver) -> respObserver.onError(Status.INVALID_ARGUMENT.asRuntimeException()))) .build(), - ImmutableList.of(newAttachHeadersServerInterceptor(extraHeaders))); + ImmutableList.of(newAttachMetadataServerInterceptor(extras))); grpcCleanup.register(newInProcessServerBuilder().addService(serviceDef).build().start()); ManagedChannel channel = @@ -160,8 +161,8 @@ public void shouldAttachHeadersToErrorResponse() throws IOException { assertThat(e.getStatus()).isNotNull(); assertThat(e.getStatus().getCode()).isEqualTo(Code.INVALID_ARGUMENT); } - Metadata headers = headersCapture.get(); - assertThat(headers.get(FOO_KEY)).isEqualTo("foo-value"); + assertThat(headersCapture.get() == null || !headersCapture.get().containsKey(FOO_KEY)).isTrue(); + assertThat(trailersCapture.get().get(FOO_KEY)).isEqualTo("foo-value"); } private static InProcessServerBuilder newInProcessServerBuilder() {