diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 301ecd66b5..c6591d588a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -86,6 +86,7 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; @@ -366,10 +367,16 @@ public UnaryCallable createReadRowCallable(RowAdapter .build(), rowAdapter); - UnaryCallable readRowCallable = - new ReadRowsUserCallable<>(readRowsCallable, requestContext).first(); + ReadRowsUserCallable readRowCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + + ServerStreamingCallable traced = + new TracedServerStreamingCallable<>( + readRowCallable, clientContext.getTracerFactory(), getSpanName("ReadRow")); + + ReadRowsFirstCallable firstRow = new ReadRowsFirstCallable<>(traced); - return createUserFacingUnaryCallable("ReadRow", readRowCallable); + return firstRow.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java index c2584e0c93..2ef26605b4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java @@ -16,7 +16,12 @@ package com.google.cloud.bigtable.data.v2.stub.readrows; import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StateCheckingResponseObserver; +import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.bigtable.data.v2.models.Query; @@ -24,15 +29,51 @@ * Enhancement for `readRowsCallable().first()` to gracefully limit the row count instead of * cancelling the RPC */ -class ReadRowsFirstCallable extends UnaryCallable { - private final UnaryCallable inner; +@InternalApi +public class ReadRowsFirstCallable extends UnaryCallable { - ReadRowsFirstCallable(UnaryCallable inner) { + private final ServerStreamingCallable inner; + + public ReadRowsFirstCallable(ServerStreamingCallable inner) { this.inner = inner; } @Override public ApiFuture futureCall(Query query, ApiCallContext context) { - return inner.futureCall(query.limit(1), context); + ReadRowsFirstResponseObserver observer = new ReadRowsFirstResponseObserver<>(); + this.inner.call(query.limit(1), observer, context); + return observer.getFuture(); + } + + private class ReadRowsFirstResponseObserver extends StateCheckingResponseObserver { + private StreamController innerController; + private RowT firstRow; + private SettableApiFuture settableFuture = SettableApiFuture.create(); + + @Override + protected void onStartImpl(StreamController streamController) { + this.innerController = streamController; + } + + @Override + protected void onResponseImpl(RowT response) { + if (firstRow == null) { + this.firstRow = response; + } + } + + @Override + protected void onErrorImpl(Throwable throwable) { + settableFuture.setException(throwable); + } + + @Override + protected void onCompleteImpl() { + settableFuture.set(firstRow); + } + + protected ApiFuture getFuture() { + return settableFuture; + } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java index 94ced791c5..3f1db6d0d8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java @@ -19,7 +19,6 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.api.gax.rpc.UnaryCallable; import com.google.bigtable.v2.ReadRowsRequest; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.Query; @@ -34,14 +33,11 @@ public class ReadRowsUserCallable extends ServerStreamingCallable { private final ServerStreamingCallable inner; private final RequestContext requestContext; - private final ReadRowsFirstCallable firstCallable; public ReadRowsUserCallable( ServerStreamingCallable inner, RequestContext requestContext) { this.inner = inner; this.requestContext = requestContext; - - this.firstCallable = new ReadRowsFirstCallable<>(super.first()); } @Override @@ -49,12 +45,4 @@ public void call(Query request, ResponseObserver responseObserver, ApiCall ReadRowsRequest innerRequest = request.toProto(requestContext); inner.call(innerRequest, responseObserver, context); } - - // Optimization: since the server supports row limits, override the first callable. - // This way unnecessary data doesn't need to be buffered and the number of CANCELLED request - // statuses is minimized - @Override - public UnaryCallable first() { - return firstCallable; - } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java index edc20e6081..07cf3478c1 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java @@ -15,8 +15,13 @@ */ package com.google.cloud.bigtable.data.v2.stub.readrows; -import com.google.api.core.SettableApiFuture; -import com.google.api.gax.rpc.UnaryCallable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; @@ -38,26 +43,23 @@ public class ReadRowsFirstCallableTest { private static final RequestContext REQUEST_CONTEXT = RequestContext.create("fake-project", "fake-instance", "fake-profile"); - private UnaryCallable innerCallable; + + private ServerStreamingCallable innerCallable; private ArgumentCaptor innerQuery; - private SettableApiFuture innerResult; @SuppressWarnings("unchecked") @Before public void setUp() { - innerCallable = Mockito.mock(UnaryCallable.class); + innerCallable = Mockito.mock(ServerStreamingCallable.class); innerQuery = ArgumentCaptor.forClass(Query.class); - innerResult = SettableApiFuture.create(); - Mockito.when(innerCallable.futureCall(innerQuery.capture(), Mockito.any())) - .thenReturn(innerResult); } @Test public void testLimitAdded() { ReadRowsFirstCallable callable = new ReadRowsFirstCallable<>(innerCallable); - innerResult.set(null); - callable.call(Query.create("fake-table")); - + callable.futureCall(Query.create("fake-table"), GrpcCallContext.createDefault()); + verify(innerCallable) + .call(innerQuery.capture(), any(ResponseObserver.class), any(ApiCallContext.class)); Truth.assertThat(innerQuery.getValue().toProto(REQUEST_CONTEXT)) .isEqualTo(Query.create("fake-table").limit(1).toProto(REQUEST_CONTEXT)); } @@ -65,9 +67,9 @@ public void testLimitAdded() { @Test public void testLimitChanged() { ReadRowsFirstCallable callable = new ReadRowsFirstCallable<>(innerCallable); - innerResult.set(null); - callable.call(Query.create("fake-table").limit(1_000)); - + callable.futureCall(Query.create("fake-table").limit(10), GrpcCallContext.createDefault()); + verify(innerCallable) + .call(innerQuery.capture(), any(ResponseObserver.class), any(ApiCallContext.class)); Truth.assertThat(innerQuery.getValue().toProto(REQUEST_CONTEXT)) .isEqualTo(Query.create("fake-table").limit(1).toProto(REQUEST_CONTEXT)); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallableTest.java index d1e03df2c6..b518a55415 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallableTest.java @@ -41,17 +41,4 @@ public void testRequestConverted() { Truth.assertThat(innerCallable.getActualRequest()).isEqualTo(query.toProto(REQUEST_CONTEXT)); } - - @Test - public void testFirstIsLimited() { - ServerStreamingStashCallable innerCallable = - new ServerStreamingStashCallable<>(); - ReadRowsUserCallable callable = new ReadRowsUserCallable<>(innerCallable, REQUEST_CONTEXT); - Query query = Query.create("fake-table"); - - callable.first().call(query); - - Truth.assertThat(innerCallable.getActualRequest()) - .isEqualTo(query.limit(1).toProto(REQUEST_CONTEXT)); - } }