Skip to content

Commit

Permalink
feat: send attempt and timestamp in headers
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Aug 4, 2021
1 parent d7e8317 commit 7c0c253
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand Down Expand Up @@ -119,6 +120,11 @@
*/
@InternalApi
public class EnhancedBigtableStub implements AutoCloseable {
static final Metadata.Key<String> ATTEMPT_HEADER_KEY =
Metadata.Key.of("attempt", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> TIMESTAMP_HEADER_KEY =
Metadata.Key.of("client-timing", Metadata.ASCII_STRING_MARSHALLER);

private static final String CLIENT_NAME = "Bigtable";
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);

Expand Down Expand Up @@ -397,11 +403,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
.build(),
readRowsSettings.getRetryableCodes());

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withExtraHeaders =
new ExtraHeadersSeverStreamingCallable<>(base);

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(base);
new ReadRowsConvertExceptionCallable<>(withExtraHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);
Expand Down Expand Up @@ -468,9 +477,12 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(spoolable);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
Expand Down Expand Up @@ -505,9 +517,12 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
Expand Down Expand Up @@ -646,6 +661,9 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withExtraHeaders =
new ExtraHeadersSeverStreamingCallable<>(base);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
Expand All @@ -656,7 +674,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
base,
withExtraHeaders,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
Expand Down Expand Up @@ -689,9 +707,12 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(base);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
Expand Down Expand Up @@ -726,10 +747,14 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withExtraHeaders =
new ExtraHeadersUnaryCallable<>(base);

String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withExtraHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracer;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/** A callable that injects client timestamp and attempt count to request headers. */
final class ExtraHeadersSeverStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {
private final ServerStreamingCallable innerCallable;

ExtraHeadersSeverStreamingCallable(ServerStreamingCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public void call(
RequestT request,
ResponseObserver<ResponseT> responseObserver,
ApiCallContext apiCallContext) {
int attemptCount = -1;
if (apiCallContext.getTracer() instanceof CompositeTracer) {
attemptCount = ((CompositeTracer) apiCallContext.getTracer()).getAttempt();
}
Map<String, List<String>> headers =
ImmutableMap.of(
EnhancedBigtableStub.ATTEMPT_HEADER_KEY.name(),
Arrays.asList(String.valueOf(attemptCount)),
EnhancedBigtableStub.TIMESTAMP_HEADER_KEY.name(),
Arrays.asList(String.valueOf(System.currentTimeMillis())));
ApiCallContext newCallContext = apiCallContext.withExtraHeaders(headers);
innerCallable.call(request, responseObserver, newCallContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracer;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/** A callable that injects client timestamp and attempt count to request headers. */
final class ExtraHeadersUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable innerCallable;

public ExtraHeadersUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture futureCall(RequestT request, ApiCallContext apiCallContext) {
int attemptCount = -1;
if (apiCallContext.getTracer() instanceof CompositeTracer) {
attemptCount = ((CompositeTracer) apiCallContext.getTracer()).getAttempt();
}
Map<String, List<String>> headers =
ImmutableMap.of(
EnhancedBigtableStub.ATTEMPT_HEADER_KEY.name(),
Arrays.asList(String.valueOf(attemptCount)),
EnhancedBigtableStub.TIMESTAMP_HEADER_KEY.name(),
Arrays.asList(String.valueOf(System.currentTimeMillis())));
ApiCallContext newCallContext = apiCallContext.withExtraHeaders(headers);
return innerCallable.futureCall(request, newCallContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.common.collect.ImmutableList;
Expand All @@ -23,8 +24,10 @@
import org.threeten.bp.Duration;

/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */
class CompositeTracer extends BaseApiTracer {
@InternalApi("For internal use only")
public class CompositeTracer extends BaseApiTracer {
private final List<ApiTracer> children;
private int attempt = 0;

CompositeTracer(List<ApiTracer> children) {
this.children = ImmutableList.copyOf(children);
Expand Down Expand Up @@ -81,6 +84,7 @@ public void attemptStarted(int attemptNumber) {
for (ApiTracer child : children) {
child.attemptStarted(attemptNumber);
}
this.attempt = attemptNumber;
}

@Override
Expand Down Expand Up @@ -152,4 +156,8 @@ public void batchRequestSent(long elementCount, long requestSize) {
child.batchRequestSent(elementCount, requestSize);
}
}

public int getAttempt() {
return attempt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ public Void call() {

// attemptStared should be called at the very start of the operation. This will initialize
// variables in ApiTracer and avoid exceptions when the tracer marks the attempt as finished
callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());
callContext.getTracer().attemptStarted(externalFuture.getAttemptSettings().getAttemptCount());

Preconditions.checkState(
currentRequest.getEntriesCount() > 0, "Request doesn't have any mutations to send");
Expand Down
Loading

0 comments on commit 7c0c253

Please sign in to comment.