Skip to content

Commit

Permalink
fix: rename HeaderTracer callables to BigtableTracer callables (#1276)
Browse files Browse the repository at this point in the history
* feat: add built in metrics measure and views

* remove status from application latency

* Rename methods and add comments

* update based on comments

* feat: update tracers to use built in metrics

* update on comments

* fix: rename HeaderTracer callables to BigtableTracer callables

* deflake test

* fix broken test
  • Loading branch information
mutianf authored Jun 29, 2022
1 parent df77560 commit e0bd6c9
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 43 deletions.
10 changes: 10 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,14 @@
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable</className>
<method>*</method>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable</className>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable</className>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
Expand Down Expand Up @@ -377,7 +377,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Add header tracer for tracking GFE metrics.
* <li>Add bigtable tracer for tracking bigtable specific metrics.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
Expand Down Expand Up @@ -428,13 +428,13 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withHeaderTracer =
new HeaderTracerStreamingCallable<>(watched);
ServerStreamingCallable<ReadRowsRequest, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand Down Expand Up @@ -473,11 +473,11 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
UnaryCallable<Query, List<RowT>> tracedBatcher =
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(tracedBatcher);
UnaryCallable<Query, List<RowT>> withBigtableTracer =
new BigtableTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -519,11 +519,11 @@ public Map<String, String> extract(
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withStatsHeaders =
new StatsHeadersUnaryCallable<>(spoolable);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -558,11 +558,11 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -605,10 +605,10 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> withBigtableTracer =
new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -746,11 +746,11 @@ public Map<String, String> extract(
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -787,11 +787,12 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new StatsHeadersUnaryCallable<>(base);

String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@
import javax.annotation.Nonnull;

/**
* This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
* returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that
* were injected in the header/trailer and publish them to OpenCensus. If {@link
* GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
* reached GFE, and it'll increment the gfe_header_missing_counter in this case.
* This callable will
*
* <p>If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
* This is for the case where direct path is enabled, all the requests won't go through GFE and
* therefore won't have the server-timing header.
* <p>-inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC
* methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
* the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment the
* gfe_header_missing_counter in this case.
*
* <p>-Call {@link BigtableTracer#onRequest()} to record the request events in a stream.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class HeaderTracerStreamingCallable<RequestT, ResponseT>
public class BigtableTracerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> innerCallable;

public HeaderTracerStreamingCallable(
public BigtableTracerStreamingCallable(
@Nonnull ServerStreamingCallable<RequestT, ResponseT> callable) {
this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,16 @@
* GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
* reached GFE, and it'll increment the gfe_header_missing_counter in this case.
*
* <p>If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
* This is for the case where direct path is enabled, all the requests won't go through GFE and
* therefore won't have the server-timing header.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class HeaderTracerUnaryCallable<RequestT, ResponseT>
public class BigtableTracerUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {

private final UnaryCallable<RequestT, ResponseT> innerCallable;

public HeaderTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> innerCallable) {
public BigtableTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> innerCallable) {
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class HeaderTracerCallableTest {
public class BigtableTracerCallableTest {
private Server server;
private Server serverNoHeader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,10 @@ public void testRetryCount() {
stub.mutateRowCallable()
.call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));

verify(statsRecorderWrapper).putRetryCount(retryCount.capture());
// onOperationComplete() is called in TracerFinisher which will be called after the mutateRow
// call is returned. So there's a race between when the call returns and when the putRetryCount
// is called in onOperationCompletion().
verify(statsRecorderWrapper, timeout(20)).putRetryCount(retryCount.capture());

assertThat(retryCount.getValue()).isEqualTo(fakeService.getAttemptCounter().get() - 1);
}
Expand All @@ -328,7 +331,7 @@ public void testMutateRowAttempts() {
// calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be
// called after the mutateRow call is returned. So there's a race between when the call returns
// and when the record() is called in onOperationCompletion().
verify(statsRecorderWrapper, timeout(10).times(fakeService.getAttemptCounter().get() + 1))
verify(statsRecorderWrapper, timeout(20).times(fakeService.getAttemptCounter().get() + 1))
.record(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues()).containsExactly(UNDEFINED, UNDEFINED, UNDEFINED, UNDEFINED);
assertThat(cluster.getAllValues()).containsExactly(UNDEFINED, UNDEFINED, UNDEFINED, UNDEFINED);
Expand Down

0 comments on commit e0bd6c9

Please sign in to comment.