Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenCensus Support for Cloud Pub/Sub #4240

Merged
merged 15 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-clients/google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
<artifactId>grpc-google-iam-v1</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to add opencensus-impl here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not, only the final app should do this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the test dependencies rather than the library dependencies. I use the implementation to test OpenCensusUtil.

<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<version>${opencensus.version}</version>
<scope>test</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFunction;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.MustBeClosed;
import com.google.pubsub.v1.PubsubMessage;
import io.opencensus.common.Scope;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import io.opencensus.tags.propagation.TagContextBinarySerializer;
import io.opencensus.trace.Link;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.propagation.SpanContextParseException;
import io.opencensus.trace.propagation.TextFormat;
import io.opencensus.trace.propagation.TextFormat.Getter;
import io.opencensus.trace.propagation.TextFormat.Setter;
import io.opencensus.trace.samplers.Samplers;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers
* to subscribers.
*/
public class OpenCensusUtil {
private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName());

public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey";
public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey";
@VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver";
private static final String TRACEPARENT_KEY = "traceparent";

private static final Tagger tagger = Tags.getTagger();
private static final TagContextBinarySerializer serializer =
Tags.getTagPropagationComponent().getBinarySerializer();

private static final Tracer tracer = Tracing.getTracer();
private static final TextFormat traceContextTextFormat =
Tracing.getPropagationComponent().getTraceContextFormat();

/**
* Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as
* attributes to the {@link PubsubMessage}.
*/
public static final ApiFunction<PubsubMessage, PubsubMessage> OPEN_CENSUS_MESSAGE_TRANSFORM =
new ApiFunction<PubsubMessage, PubsubMessage>() {
@Override
public PubsubMessage apply(PubsubMessage message) {
PubsubMessage.Builder builder = PubsubMessage.newBuilder(message);
String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext());
String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext());
if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) {
return message;
}
if (!encodedSpanContext.isEmpty()) {
builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext);
}
if (!encodedTagContext.isEmpty()) {
builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext);
}
return builder.build();
}
};

private static final Setter<StringBuilder> setter =
new Setter<StringBuilder>() {
@Override
public void put(StringBuilder carrier, String key, String value) {
if (key.equals(TRACEPARENT_KEY)) {
carrier.append(value);
}
}
};

private static final Getter<String> getter =
new Getter<String>() {
@Override
public String get(String carrier, String key) {
return key.equals(TRACEPARENT_KEY) ? carrier : null;
}
};

@VisibleForTesting
static String encodeSpanContext(SpanContext ctxt) {
StringBuilder builder = new StringBuilder();
traceContextTextFormat.inject(ctxt, builder, setter);
return builder.toString();
}

// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static String encodeTagContext(TagContext tags) {
return "";
}

// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static Scope createScopedTagContext(String encodedTags) {
return tagger.withTagContext(tagger.getCurrentTagContext());
}

@VisibleForTesting
@MustBeClosed
static Scope createScopedSpan(String name) {
return tracer
.spanBuilderWithExplicitParent(name, tracer.getCurrentSpan())
Copy link

@igorbernstein2 igorbernstein2 Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the current span on the receiver side? Shouldn't it start a new root span? Also, shouldn't recordEvents setting be copied from the publisher's span config?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the receiver side, we create a root span and set the publisher's span as a parent link.

.setRecordEvents(true)
// Note: we preserve the sampling decision from the publisher.
.setSampler(Samplers.alwaysSample())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinooliva, does this need to be fixed before merging?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as per the TODO() - done now.

.startScopedSpan();
}

private static void addParentLink(String encodedParentSpanContext) {
try {
SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter);
tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN));
} catch (SpanContextParseException exn) {
logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn);
}
}

/**
* Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts and
* puts them in scope.
*/
public static class OpenCensusMessageReceiver implements MessageReceiver {
private final MessageReceiver receiver;

public OpenCensusMessageReceiver(MessageReceiver receiver) {
this.receiver = receiver;
}

@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need configuration here as well, so that the user can define whether or not they want this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense too.

String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, "");
if (encodedTagContext.isEmpty()) {
addTraceScope(message, consumer);
return;
}
try (Scope statsScope = createScopedTagContext(encodedTagContext)) {
addTraceScope(message, consumer);
}
}

private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) {
String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, "");
if (encodedSpanContext.isEmpty()) {
receiver.receiveMessage(message, consumer);
return;
}
try (Scope spanScope = createScopedSpan(MESSAGE_RECEIVER_SPAN_NAME)) {
addParentLink(encodedSpanContext);
receiver.receiveMessage(message, consumer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class Publisher {
private final List<AutoCloseable> closeables;
private final MessageWaiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
Expand All @@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new LinkedList<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -192,6 +195,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}

message = messageTransform.apply(message);
final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
Expand Down Expand Up @@ -528,6 +532,14 @@ public static final class Builder {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();

ApiFunction<PubsubMessage, PubsubMessage> messageTransform =
new ApiFunction<PubsubMessage, PubsubMessage>() {
@Override
public PubsubMessage apply(PubsubMessage input) {
return input;
}
};

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
Expand Down Expand Up @@ -610,6 +622,17 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return this;
}

/**
* Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage}
* before it is sent
*/
@BetaApi
public Builder setTransform(ApiFunction<PubsubMessage, PubsubMessage> messageTransform) {
this.messageTransform =
Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null.");
return this;
}

public Publisher build() throws IOException {
return new Publisher(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.OpenCensusUtil.MESSAGE_RECEIVER_SPAN_NAME;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.OPEN_CENSUS_MESSAGE_TRANSFORM;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.opencensus.common.Scope;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import io.opencensus.trace.Link;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.export.RunningSpanStore;
import io.opencensus.trace.export.RunningSpanStore.Filter;
import io.opencensus.trace.export.SpanData;
import java.util.Collection;
import java.util.List;
import org.junit.Test;

/** Tests for {@link OpenCensusUtil}. */
public class OpenCensusUtilTest {
private static final Tagger tagger = Tags.getTagger();
private static final Tracer tracer = Tracing.getTracer();
private static final TagKey TEST_TAG_KEY = TagKey.create("TEST_TAG_KEY");
private static final TagValue TEST_TAG_VAL = TagValue.create("TEST_TAG_VAL");
private static final String TEST_PARENT_LINK_NAME = "TEST_PARENT_LINK";

// Verifies that trace contexts propagated as an attribute are set as the parent link in the
// message receiver and that the tag context is not change (for now).
@Test
public void testOpenCensusMessageReceiver() throws Exception {
PubsubMessage message;
SpanContext publisherContext;
try (Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME);
Scope tagScope = createScopeTags()) {
message = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(generatePubsubMessage(500));
publisherContext = tracer.getCurrentSpan().getContext();
}
MessageReceiver receiver =
new OpenCensusUtil.OpenCensusMessageReceiver(
new TestMessageReceiver(publisherContext, tagger.getCurrentTagContext()));
receiver.receiveMessage(message, new NoOpAckReplyConsumer());
}

// Verifies that the current span context is added as an attribute and that (for now) the tag
// context is not added as an attribute.
@Test
public void testOpenCensusMessageTransformer() {
try (Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot");
Scope tagScope = createScopeTags()) {
PubsubMessage originalMessage = generatePubsubMessage(500);
assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
assertEquals("", originalMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));

PubsubMessage attributedMessage = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(originalMessage);
String encodedSpanContext =
OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext());
assertNotEquals("", encodedSpanContext);
assertEquals(
encodedSpanContext, attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
assertEquals("", attributedMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));
}
}

private static PubsubMessage generatePubsubMessage(int size) {
byte[] bytes = new byte[size];
for (int i = 0; i < size; i++) {
bytes[i] = (byte) (120 + i % 20);
}
return PubsubMessage.newBuilder().setData(ByteString.copyFrom(bytes)).build();
}

private static Scope createScopeTags() {
return tagger.currentBuilder().put(TEST_TAG_KEY, TEST_TAG_VAL).buildScoped();
}

private static final class NoOpAckReplyConsumer implements AckReplyConsumer {
@Override
public void ack() {}

@Override
public void nack() {}
}

private static final class TestMessageReceiver implements MessageReceiver {
private static final RunningSpanStore runningSpanStore =
Tracing.getExportComponent().getRunningSpanStore();
private static final Filter RECEIVER_FILTER = Filter.create(MESSAGE_RECEIVER_SPAN_NAME, 0);

SpanContext parentLinkedSpan;
TagContext originalTagContext;

private TestMessageReceiver(SpanContext parentLinkedSpan, TagContext originalTagContext) {
this.parentLinkedSpan = parentLinkedSpan;
this.originalTagContext = originalTagContext;
}

@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
assertEquals(originalTagContext, tagger.getCurrentTagContext());
Collection<SpanData> spanDatas = runningSpanStore.getRunningSpans(RECEIVER_FILTER);
assertEquals(spanDatas.size(), 1);
for (SpanData spanData : spanDatas) {
List<Link> links = spanData.getLinks().getLinks();
assertEquals(links.size(), 1);
Link link = links.get(0);
assertEquals(Link.Type.PARENT_LINKED_SPAN, link.getType());
assertEquals(parentLinkedSpan.getTraceId(), link.getTraceId());
assertEquals(parentLinkedSpan.getSpanId(), link.getSpanId());
}
consumer.ack();
}
}
}