Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Aug 10, 2021
1 parent 29688ee commit a0c6c64
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
Expand All @@ -90,7 +90,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand Down Expand Up @@ -120,11 +119,6 @@
*/
@InternalApi
public class EnhancedBigtableStub implements AutoCloseable {
static final Metadata.Key<String> ATTEMPT_HEADER_KEY =
Metadata.Key.of("attempt", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> TIMESTAMP_HEADER_KEY =
Metadata.Key.of("client-timing", Metadata.ASCII_STRING_MARSHALLER);

private static final String CLIENT_NAME = "Bigtable";
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);

Expand Down Expand Up @@ -197,7 +191,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
new BigtableTracerFactory(
ImmutableList.of(
// Add OpenCensus Tracing
new OpencensusTracerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracer;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import com.google.cloud.bigtable.data.v2.stub.metrics.Util;

/** A callable that injects client timestamp and attempt count to request headers. */
/**
* A callable that injects client timestamp and current attempt number to request headers. Attempt
* number starts from 0.
*/
final class ExtraHeadersSeverStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {
private final ServerStreamingCallable innerCallable;
Expand All @@ -39,17 +38,8 @@ public void call(
RequestT request,
ResponseObserver<ResponseT> responseObserver,
ApiCallContext apiCallContext) {
int attemptCount = -1;
if (apiCallContext.getTracer() instanceof CompositeTracer) {
attemptCount = ((CompositeTracer) apiCallContext.getTracer()).getAttempt();
}
Map<String, List<String>> headers =
ImmutableMap.of(
EnhancedBigtableStub.ATTEMPT_HEADER_KEY.name(),
Arrays.asList(String.valueOf(attemptCount)),
EnhancedBigtableStub.TIMESTAMP_HEADER_KEY.name(),
Arrays.asList(String.valueOf(System.currentTimeMillis())));
ApiCallContext newCallContext = apiCallContext.withExtraHeaders(headers);
ApiCallContext newCallContext =
apiCallContext.withExtraHeaders(Util.createExtraHeaders(apiCallContext));
innerCallable.call(request, responseObserver, newCallContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracer;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import com.google.cloud.bigtable.data.v2.stub.metrics.Util;

/** A callable that injects client timestamp and attempt count to request headers. */
/**
* A callable that injects client timestamp and current attempt number to request headers. Attempt
* number starts from 0.
*/
final class ExtraHeadersUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable innerCallable;
Expand All @@ -35,18 +34,10 @@ public ExtraHeadersUnaryCallable(UnaryCallable innerCallable) {
}

@Override
@SuppressWarnings("unchecked")
public ApiFuture futureCall(RequestT request, ApiCallContext apiCallContext) {
int attemptCount = -1;
if (apiCallContext.getTracer() instanceof CompositeTracer) {
attemptCount = ((CompositeTracer) apiCallContext.getTracer()).getAttempt();
}
Map<String, List<String>> headers =
ImmutableMap.of(
EnhancedBigtableStub.ATTEMPT_HEADER_KEY.name(),
Arrays.asList(String.valueOf(attemptCount)),
EnhancedBigtableStub.TIMESTAMP_HEADER_KEY.name(),
Arrays.asList(String.valueOf(System.currentTimeMillis())));
ApiCallContext newCallContext = apiCallContext.withExtraHeaders(headers);
ApiCallContext newCallContext =
apiCallContext.withExtraHeaders(Util.createExtraHeaders(apiCallContext));
return innerCallable.futureCall(request, newCallContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.threeten.bp.Duration;

/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */
/**
* A Bigtable specific {@link ApiTracer} that will be used to plumb additional context through the
* call chains as well as combines multiple user defined {@link ApiTracer}s into a single one. This
* will ensure that operation lifecycle events are plumbed through while maintaining user configured
* functionalities.
*/
@InternalApi("For internal use only")
public class CompositeTracer extends BaseApiTracer {
public class BigtableTracer extends BaseApiTracer {
private final List<ApiTracer> children;
private int attempt = 0;

CompositeTracer(List<ApiTracer> children) {
BigtableTracer(List<ApiTracer> children) {
this.children = ImmutableList.copyOf(children);
}

Expand Down Expand Up @@ -157,6 +163,11 @@ public void batchRequestSent(long elementCount, long requestSize) {
}
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and recorded in {@link #attemptStarted(int)}. With the getter we can access it from {@link
* ApiCallContext}.
*/
public int getAttempt() {
return attempt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import java.util.ArrayList;
import java.util.List;

/** Combines multiple {@link ApiTracerFactory} into a single {@link ApiTracerFactory}. */
/**
* A Bigtable specific {@link ApiTracerFactory} that combines multiple {@link ApiTracerFactory} into
* a single one.
*/
@InternalApi("For internal use only")
public class CompositeTracerFactory extends BaseApiTracerFactory {
public class BigtableTracerFactory extends BaseApiTracerFactory {
private final List<ApiTracerFactory> apiTracerFactories;

public CompositeTracerFactory(List<ApiTracerFactory> apiTracerFactories) {
public BigtableTracerFactory(List<ApiTracerFactory> apiTracerFactories) {
this.apiTracerFactories = ImmutableList.copyOf(apiTracerFactories);
}

Expand All @@ -40,6 +43,6 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
for (ApiTracerFactory factory : apiTracerFactories) {
children.add(factory.newTracer(parent, spanName, operationType));
}
return new CompositeTracer(children);
return new BigtableTracer(children);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,33 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.tags.TagValue;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

/** Utilities to help integrating with OpenCensus. */
class Util {
@InternalApi("For internal use only")
public class Util {
public static final Metadata.Key<String> ATTEMPT_HEADER_KEY =
Metadata.Key.of("attempt", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> TIMESTAMP_HEADER_KEY =
Metadata.Key.of("client-timing", Metadata.ASCII_STRING_MARSHALLER);

private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString());

/** Convert an exception into a value that can be used as an OpenCensus tag value. */
Expand Down Expand Up @@ -71,4 +84,22 @@ static TagValue extractStatus(Future<?> future) {
}
return extractStatus(error);
}

/**
* Create extra headers with attempt number and client timestamp from api call context. Attempt
* number starts from 0.
*/
public static Map<String, List<String>> createExtraHeaders(ApiCallContext apiCallContext) {
int attemptCount = -1;
if (apiCallContext.getTracer() instanceof BigtableTracer) {
attemptCount = ((BigtableTracer) apiCallContext.getTracer()).getAttempt();
}
ImmutableMap.Builder headers = ImmutableMap.<String, List<String>>builder();
if (attemptCount != -1) {
headers.put(ATTEMPT_HEADER_KEY.name(), Arrays.asList(String.valueOf(attemptCount)));
}
headers.put(
TIMESTAMP_HEADER_KEY.name(), Arrays.asList(String.valueOf(System.currentTimeMillis())));
return headers.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.Util;
import com.google.common.collect.Queues;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
Expand All @@ -52,8 +53,6 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.opencensus.impl.stats.StatsComponentImpl;
import io.opencensus.stats.StatsComponent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
Expand All @@ -68,7 +67,6 @@ public class ExtraHeadersCallableTest {

private FakeService fakeService = new FakeService();

private final StatsComponent localStats = new StatsComponentImpl();
private EnhancedBigtableStub stub;

private static final String PROJECT_ID = "fake-project";
Expand Down Expand Up @@ -261,11 +259,11 @@ private void verifyHeaders(int expectedAttemptCounts, long startTimestamp) throw

for (int i = 0; i < expectedAttemptCounts; i++) {
Metadata headers = metadataInterceptor.headers.take();
String attemptCount = headers.get(EnhancedBigtableStub.ATTEMPT_HEADER_KEY);
String attemptCount = headers.get(Util.ATTEMPT_HEADER_KEY);
assertThat(attemptCount).isNotNull();
assertThat(Integer.valueOf(attemptCount)).isEqualTo(i);

String clientTimeStr = headers.get(EnhancedBigtableStub.TIMESTAMP_HEADER_KEY);
String clientTimeStr = headers.get(Util.TIMESTAMP_HEADER_KEY);
assertThat(clientTimeStr).isNotNull();
long clientTime = Long.valueOf(clientTimeStr);
assertThat(clientTime).isAtLeast(timestamp);
Expand Down
Loading

0 comments on commit a0c6c64

Please sign in to comment.