Skip to content

Commit

Permalink
OpenCensus Support for Cloud Pub/Sub (#4240)
Browse files Browse the repository at this point in the history
* Adds OpenCensus context propagation to Publisher and Subscriber.

* Updates OpenCensus attribute keys so that they will be propagated by CPS.

* Addresses reviewer comments by fixing build files and using only defined annotations.

* Updates build dependencies and copyright date.

* Fixes typo.

* Removes encoding of OpenCensus tags. Will re-enable once text encoding spec has been finalized (census-instrumentation/opencensus-specs#65).

* Updates encoding of SpanContext to use W3C specified encoding; Also preserves sampling decision from the publisher in the subscriber.

* Adds unit test for OpenCensusUtil.

* Adds unit test for OpenCensusUtil.

* Updates OpenCensus integration to use a generic MessageTransform.

* Removes now-unused private constant.

* Update pom.xml

* Marking setTransform as BetaApi

* Fixes for formatting issues.
  • Loading branch information
dinooliva authored and sduskis committed Mar 20, 2019
1 parent 750a8a2 commit 1df3ea2
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 0 deletions.
6 changes: 6 additions & 0 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
<artifactId>grpc-google-iam-v1</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<version>${opencensus.version}</version>
<scope>test</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFunction;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.MustBeClosed;
import com.google.pubsub.v1.PubsubMessage;
import io.opencensus.common.Scope;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import io.opencensus.tags.propagation.TagContextBinarySerializer;
import io.opencensus.trace.Link;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.propagation.SpanContextParseException;
import io.opencensus.trace.propagation.TextFormat;
import io.opencensus.trace.propagation.TextFormat.Getter;
import io.opencensus.trace.propagation.TextFormat.Setter;
import io.opencensus.trace.samplers.Samplers;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers
* to subscribers.
*/
public class OpenCensusUtil {
private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName());

public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey";
public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey";
@VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver";
private static final String TRACEPARENT_KEY = "traceparent";

private static final Tagger tagger = Tags.getTagger();
private static final TagContextBinarySerializer serializer =
Tags.getTagPropagationComponent().getBinarySerializer();

private static final Tracer tracer = Tracing.getTracer();
private static final TextFormat traceContextTextFormat =
Tracing.getPropagationComponent().getTraceContextFormat();

/**
* Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as
* attributes to the {@link PubsubMessage}.
*/
public static final ApiFunction<PubsubMessage, PubsubMessage> OPEN_CENSUS_MESSAGE_TRANSFORM =
new ApiFunction<PubsubMessage, PubsubMessage>() {
@Override
public PubsubMessage apply(PubsubMessage message) {
PubsubMessage.Builder builder = PubsubMessage.newBuilder(message);
String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext());
String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext());
if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) {
return message;
}
if (!encodedSpanContext.isEmpty()) {
builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext);
}
if (!encodedTagContext.isEmpty()) {
builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext);
}
return builder.build();
}
};

private static final Setter<StringBuilder> setter =
new Setter<StringBuilder>() {
@Override
public void put(StringBuilder carrier, String key, String value) {
if (key.equals(TRACEPARENT_KEY)) {
carrier.append(value);
}
}
};

private static final Getter<String> getter =
new Getter<String>() {
@Override
public String get(String carrier, String key) {
return key.equals(TRACEPARENT_KEY) ? carrier : null;
}
};

@VisibleForTesting
static String encodeSpanContext(SpanContext ctxt) {
StringBuilder builder = new StringBuilder();
traceContextTextFormat.inject(ctxt, builder, setter);
return builder.toString();
}

// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static String encodeTagContext(TagContext tags) {
return "";
}

// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static Scope createScopedTagContext(String encodedTags) {
return tagger.withTagContext(tagger.getCurrentTagContext());
}

@VisibleForTesting
@MustBeClosed
static Scope createScopedSpan(String name) {
return tracer
.spanBuilderWithExplicitParent(name, tracer.getCurrentSpan())
.setRecordEvents(true)
// Note: we preserve the sampling decision from the publisher.
.setSampler(Samplers.alwaysSample())
.startScopedSpan();
}

private static void addParentLink(String encodedParentSpanContext) {
try {
SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter);
tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN));
} catch (SpanContextParseException exn) {
logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn);
}
}

/**
* Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts and
* puts them in scope.
*/
public static class OpenCensusMessageReceiver implements MessageReceiver {
private final MessageReceiver receiver;

public OpenCensusMessageReceiver(MessageReceiver receiver) {
this.receiver = receiver;
}

@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, "");
if (encodedTagContext.isEmpty()) {
addTraceScope(message, consumer);
return;
}
try (Scope statsScope = createScopedTagContext(encodedTagContext)) {
addTraceScope(message, consumer);
}
}

private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) {
String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, "");
if (encodedSpanContext.isEmpty()) {
receiver.receiveMessage(message, consumer);
return;
}
try (Scope spanScope = createScopedSpan(MESSAGE_RECEIVER_SPAN_NAME)) {
addParentLink(encodedSpanContext);
receiver.receiveMessage(message, consumer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class Publisher {
private final List<AutoCloseable> closeables;
private final MessageWaiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
Expand All @@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new LinkedList<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -192,6 +195,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}

message = messageTransform.apply(message);
final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
Expand Down Expand Up @@ -528,6 +532,14 @@ public static final class Builder {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();

ApiFunction<PubsubMessage, PubsubMessage> messageTransform =
new ApiFunction<PubsubMessage, PubsubMessage>() {
@Override
public PubsubMessage apply(PubsubMessage input) {
return input;
}
};

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
Expand Down Expand Up @@ -610,6 +622,17 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return this;
}

/**
* Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage}
* before it is sent
*/
@BetaApi
public Builder setTransform(ApiFunction<PubsubMessage, PubsubMessage> messageTransform) {
this.messageTransform =
Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null.");
return this;
}

public Publisher build() throws IOException {
return new Publisher(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.OpenCensusUtil.MESSAGE_RECEIVER_SPAN_NAME;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.OPEN_CENSUS_MESSAGE_TRANSFORM;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.opencensus.common.Scope;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import io.opencensus.trace.Link;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.export.RunningSpanStore;
import io.opencensus.trace.export.RunningSpanStore.Filter;
import io.opencensus.trace.export.SpanData;
import java.util.Collection;
import java.util.List;
import org.junit.Test;

/** Tests for {@link OpenCensusUtil}. */
public class OpenCensusUtilTest {
private static final Tagger tagger = Tags.getTagger();
private static final Tracer tracer = Tracing.getTracer();
private static final TagKey TEST_TAG_KEY = TagKey.create("TEST_TAG_KEY");
private static final TagValue TEST_TAG_VAL = TagValue.create("TEST_TAG_VAL");
private static final String TEST_PARENT_LINK_NAME = "TEST_PARENT_LINK";

// Verifies that trace contexts propagated as an attribute are set as the parent link in the
// message receiver and that the tag context is not change (for now).
@Test
public void testOpenCensusMessageReceiver() throws Exception {
PubsubMessage message;
SpanContext publisherContext;
try (Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME);
Scope tagScope = createScopeTags()) {
message = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(generatePubsubMessage(500));
publisherContext = tracer.getCurrentSpan().getContext();
}
MessageReceiver receiver =
new OpenCensusUtil.OpenCensusMessageReceiver(
new TestMessageReceiver(publisherContext, tagger.getCurrentTagContext()));
receiver.receiveMessage(message, new NoOpAckReplyConsumer());
}

// Verifies that the current span context is added as an attribute and that (for now) the tag
// context is not added as an attribute.
@Test
public void testOpenCensusMessageTransformer() {
try (Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot");
Scope tagScope = createScopeTags()) {
PubsubMessage originalMessage = generatePubsubMessage(500);
assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
assertEquals("", originalMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));

PubsubMessage attributedMessage = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(originalMessage);
String encodedSpanContext =
OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext());
assertNotEquals("", encodedSpanContext);
assertEquals(
encodedSpanContext, attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
assertEquals("", attributedMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));
}
}

private static PubsubMessage generatePubsubMessage(int size) {
byte[] bytes = new byte[size];
for (int i = 0; i < size; i++) {
bytes[i] = (byte) (120 + i % 20);
}
return PubsubMessage.newBuilder().setData(ByteString.copyFrom(bytes)).build();
}

private static Scope createScopeTags() {
return tagger.currentBuilder().put(TEST_TAG_KEY, TEST_TAG_VAL).buildScoped();
}

private static final class NoOpAckReplyConsumer implements AckReplyConsumer {
@Override
public void ack() {}

@Override
public void nack() {}
}

private static final class TestMessageReceiver implements MessageReceiver {
private static final RunningSpanStore runningSpanStore =
Tracing.getExportComponent().getRunningSpanStore();
private static final Filter RECEIVER_FILTER = Filter.create(MESSAGE_RECEIVER_SPAN_NAME, 0);

SpanContext parentLinkedSpan;
TagContext originalTagContext;

private TestMessageReceiver(SpanContext parentLinkedSpan, TagContext originalTagContext) {
this.parentLinkedSpan = parentLinkedSpan;
this.originalTagContext = originalTagContext;
}

@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
assertEquals(originalTagContext, tagger.getCurrentTagContext());
Collection<SpanData> spanDatas = runningSpanStore.getRunningSpans(RECEIVER_FILTER);
assertEquals(spanDatas.size(), 1);
for (SpanData spanData : spanDatas) {
List<Link> links = spanData.getLinks().getLinks();
assertEquals(links.size(), 1);
Link link = links.get(0);
assertEquals(Link.Type.PARENT_LINKED_SPAN, link.getType());
assertEquals(parentLinkedSpan.getTraceId(), link.getTraceId());
assertEquals(parentLinkedSpan.getSpanId(), link.getSpanId());
}
consumer.ack();
}
}
}

0 comments on commit 1df3ea2

Please sign in to comment.