Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Surface the server-timing metric #535

Merged
merged 15 commits into from
Dec 17, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.stub.BigtableInterceptorProvider;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -127,6 +128,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)) // wait this long before considering the connection dead
.setInterceptorProvider(BigtableInterceptorProvider.createDefault())
mutianf marked this conversation as resolved.
Show resolved Hide resolved
.build());

LOGGER.info("Connecting to the Bigtable emulator at " + hostname + ":" + port);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.ClientHeaderInterceptor;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import java.util.List;

@BetaApi
public class BigtableInterceptorProvider implements GrpcInterceptorProvider {
mutianf marked this conversation as resolved.
Show resolved Hide resolved

private static final ClientHeaderInterceptor headerInterceptor = new ClientHeaderInterceptor();
private List<ClientInterceptor> clientInterceptors;

private BigtableInterceptorProvider(List<ClientInterceptor> interceptors) {
this.clientInterceptors = interceptors;
}

public static BigtableInterceptorProvider createDefault() {
return new BigtableInterceptorProvider(ImmutableList.<ClientInterceptor>of(headerInterceptor));
}

public static BigtableInterceptorProvider create(List<ClientInterceptor> interceptors) {
return new BigtableInterceptorProvider(interceptors);
}

@Override
public List<ClientInterceptor> getInterceptors() {
return clientInterceptors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand Down Expand Up @@ -66,6 +68,7 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
Expand All @@ -82,6 +85,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand All @@ -91,6 +95,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

/**
Expand All @@ -109,6 +114,8 @@
public class EnhancedBigtableStub implements AutoCloseable {
private static final String CLIENT_NAME = "Bigtable";

private static final Logger LOGGER = Logger.getLogger(EnhancedBigtableStub.class.getName());

private final EnhancedBigtableStubSettings settings;
private final ClientContext clientContext;
private final RequestContext requestContext;
Expand Down Expand Up @@ -203,7 +210,25 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add user configured tracer
settings.getTracerFactory())));

builder.setHeaderTracer(
builder
.getHeaderTracer()
.toBuilder()
.setStats(stats)
.setTagger(tagger)
.setStatsAttributes(
ImmutableMap.<TagKey, TagValue>builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this map instance shared with the attrs above?

.put(
RpcMeasureConstants.BIGTABLE_PROJECT_ID,
TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build())
.build());
return builder.build();
}

Expand Down Expand Up @@ -268,13 +293,12 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName spanName = SpanName.of(CLIENT_NAME, "ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable,
clientContext.getTracerFactory(),
SpanName.of(CLIENT_NAME, "ReadRows"));
readRowsUserCallable, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(getContextWithTracer(spanName));
}

/**
Expand Down Expand Up @@ -459,11 +483,11 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

SpanName spanName = SpanName.of(CLIENT_NAME, "MutateRows");
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(
userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));
new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(getContextWithTracer(spanName));
}

/**
Expand Down Expand Up @@ -634,11 +658,11 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacingUnaryCallable(
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

SpanName spanName = SpanName.of(CLIENT_NAME, methodName);
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(getContextWithTracer(spanName));
}
// </editor-fold>

Expand Down Expand Up @@ -686,6 +710,29 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}
// </editor-fold>

/**
* Adds a HeaderTracer instance and current span name to CallOptions so we could surface metrics
* in the header with {@link
* com.google.cloud.bigtable.data.v2.stub.metrics.ClientHeaderInterceptor}.
*/
private ApiCallContext getContextWithTracer(SpanName spanName) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
ApiCallContext apiCallContext = clientContext.getDefaultCallContext();
if (!(apiCallContext instanceof GrpcCallContext)) {
LOGGER.warning(
"Failed to add tracer in call context. Expected GrpcCallContext but had "
+ apiCallContext.getClass());
return apiCallContext;
}
GrpcCallContext grpcCallContext = (GrpcCallContext) apiCallContext;
CallOptions options = grpcCallContext.getCallOptions();
options =
options
.withOption(HeaderTracer.HEADER_TRACER_CONTEXT_KEY, settings.getHeaderTracer())
.withOption(HeaderTracer.SPAN_NAME_CONTEXT_KEY, spanName.toString());
mutianf marked this conversation as resolved.
Show resolved Hide resolved
grpcCallContext = grpcCallContext.withCallOptions(options);
return grpcCallContext;
}

@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -157,6 +158,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -196,6 +198,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
headerTracer = builder.headerTracer;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -240,6 +243,12 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Gets the tracer for capturing metrics in the header. */
@BetaApi
public HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand All @@ -248,6 +257,7 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)) // wait this long before considering the connection dead
.setInterceptorProvider(BigtableInterceptorProvider.createDefault())
// TODO(weiranf): Set this to true by default once DirectPath goes to public beta
.setAttemptDirectPath(isDirectPathEnabled());
}
Expand Down Expand Up @@ -501,6 +511,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand All @@ -524,6 +535,7 @@ private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
primedTableIds = ImmutableList.of();
headerTracer = HeaderTracer.newBuilder().build();
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
Expand Down Expand Up @@ -637,6 +649,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
headerTracer = settings.headerTracer;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -759,6 +772,19 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Configure the header tracer for surfacing metrics in the header. */
mutianf marked this conversation as resolved.
Show resolved Hide resolved
@BetaApi
public Builder setHeaderTracer(HeaderTracer headerTracer) {
this.headerTracer = headerTracer;
return this;
}

/** Gets the header tracer that'll be used to surface metrics in the header. */
@BetaApi
public HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -838,6 +864,7 @@ public String toString() {
.add("appProfileId", appProfileId)
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("headerTracer", headerTracer)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Loading