Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send attempt and timestamp in headers #935

Merged
merged 14 commits into from
Oct 29, 2021
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@
<className>com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub</className>
<method>*</method>
</difference>
<!-- InternalApi that was renamed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory</className>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.ExtraHeadersSeverStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.ExtraHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
Expand Down Expand Up @@ -191,7 +193,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
new BigtableTracerFactory(
ImmutableList.of(
// Add OpenCensus Tracing
new OpencensusTracerFactory(
Expand Down Expand Up @@ -397,11 +399,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
.build(),
readRowsSettings.getRetryableCodes());

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withExtraHeaders =
new ExtraHeadersSeverStreamingCallable<>(base);

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(base);
new ReadRowsConvertExceptionCallable<>(withExtraHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);
Expand Down Expand Up @@ -468,9 +473,12 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(spoolable);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
Expand Down Expand Up @@ -505,9 +513,12 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
Expand Down Expand Up @@ -646,6 +657,9 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withExtraHeaders =
new ExtraHeadersSeverStreamingCallable<>(base);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
Expand All @@ -656,7 +670,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
base,
withExtraHeaders,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
Expand Down Expand Up @@ -689,9 +703,12 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(base);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
Expand Down Expand Up @@ -726,10 +743,14 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(base);

String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.threeten.bp.Duration;

/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */
class CompositeTracer extends BaseApiTracer {
/**
* A Bigtable specific {@link ApiTracer} that will be used to plumb additional context through the
* call chains as well as combines multiple user defined {@link ApiTracer}s into a single one. This
* will ensure that operation lifecycle events are plumbed through while maintaining user configured
* functionalities.
*/
class BigtableTracer extends BaseApiTracer {
private final List<ApiTracer> children;
private volatile int attempt = 0;

CompositeTracer(List<ApiTracer> children) {
BigtableTracer(List<ApiTracer> children) {
this.children = ImmutableList.copyOf(children);
}

Expand Down Expand Up @@ -78,6 +85,7 @@ public void connectionSelected(String id) {

@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
for (ApiTracer child : children) {
child.attemptStarted(attemptNumber);
}
Expand Down Expand Up @@ -152,4 +160,13 @@ public void batchRequestSent(long elementCount, long requestSize) {
child.batchRequestSent(elementCount, requestSize);
}
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and recorded in {@link #attemptStarted(int)}. With the getter we can access it from {@link
* ApiCallContext}. Attempt number starts from 0.
*/
public int getAttempt() {
return attempt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import java.util.ArrayList;
import java.util.List;

/** Combines multiple {@link ApiTracerFactory} into a single {@link ApiTracerFactory}. */
/**
* A Bigtable specific {@link ApiTracerFactory} that combines multiple {@link ApiTracerFactory} into
* a single one.
*/
@InternalApi("For internal use only")
public class CompositeTracerFactory extends BaseApiTracerFactory {
public class BigtableTracerFactory extends BaseApiTracerFactory {
private final List<ApiTracerFactory> apiTracerFactories;

public CompositeTracerFactory(List<ApiTracerFactory> apiTracerFactories) {
public BigtableTracerFactory(List<ApiTracerFactory> apiTracerFactories) {
this.apiTracerFactories = ImmutableList.copyOf(apiTracerFactories);
}

Expand All @@ -40,6 +43,6 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
for (ApiTracerFactory factory : apiTracerFactories) {
children.add(factory.newTracer(parent, spanName, operationType));
}
return new CompositeTracer(children);
return new BigtableTracer(children);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;

/**
* A callable that injects client timestamp and current attempt number to request headers. Attempt
* number starts from 0.
*/
@InternalApi("For internal use only")
public final class ExtraHeadersSeverStreamingCallable<RequestT, ResponseT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: missing r in server

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename it to StatsHeader....

extends ServerStreamingCallable<RequestT, ResponseT> {
private final ServerStreamingCallable innerCallable;

public ExtraHeadersSeverStreamingCallable(ServerStreamingCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public void call(
RequestT request,
ResponseObserver<ResponseT> responseObserver,
ApiCallContext apiCallContext) {
ApiCallContext newCallContext =
apiCallContext.withExtraHeaders(Util.createExtraHeaders(apiCallContext));
innerCallable.call(request, responseObserver, newCallContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;

/**
* A callable that injects client timestamp and current attempt number to request headers. Attempt
* number starts from 0.
*/
@InternalApi("For internal use only")
public final class ExtraHeadersUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable innerCallable;

public ExtraHeadersUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture futureCall(RequestT request, ApiCallContext apiCallContext) {
ApiCallContext newCallContext =
apiCallContext.withExtraHeaders(Util.createExtraHeaders(apiCallContext));
return innerCallable.futureCall(request, newCallContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,33 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.tags.TagValue;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

/** Utilities to help integrating with OpenCensus. */
class Util {
static final Metadata.Key<String> ATTEMPT_HEADER_KEY =
Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> ATTEMPT_EPOCH_KEY =
Metadata.Key.of("bigtable-client-attempt-epoch-usec", Metadata.ASCII_STRING_MARSHALLER);

private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString());

/** Convert an exception into a value that can be used as an OpenCensus tag value. */
Expand Down Expand Up @@ -71,4 +84,20 @@ static TagValue extractStatus(Future<?> future) {
}
return extractStatus(error);
}

/**
* Create extra headers with attempt number and client timestamp from api call context. Attempt
* number starts from 0.
*/
static Map<String, List<String>> createExtraHeaders(ApiCallContext apiCallContext) {
ImmutableMap.Builder headers = ImmutableMap.<String, List<String>>builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please out type params on the variable type

headers.put(
ATTEMPT_EPOCH_KEY.name(),
Arrays.asList(String.valueOf(Instant.EPOCH.until(Instant.now(), ChronoUnit.MICROS))));
if (apiCallContext.getTracer() instanceof BigtableTracer) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
int attemptCount = ((BigtableTracer) apiCallContext.getTracer()).getAttempt();
headers.put(ATTEMPT_HEADER_KEY.name(), Arrays.asList(String.valueOf(attemptCount)));
}
return headers.build();
}
}
Loading