Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otel): avoid crashes in tracing wrappers for streams #14477

Merged
merged 11 commits into from
Jul 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingRead) {
EXPECT_THAT(finish, StatusIs(StatusCode::kAborted));

auto spans = span_catcher->GetSpans();
// Start() return false, the metadata will not be extracted when
// ending the span, so the span will not contain `grpc.peer` in the end.
EXPECT_THAT(
spans,
Contains(AllOf(
Expand All @@ -332,7 +334,6 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingRead) {
"google.test.admin.database.v1.GoldenKitchenSink/StreamingRead"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail"),
SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _),
OTelAttribute<std::string>("gl-cpp.status_code", kErrorCode)))));
}

Expand Down Expand Up @@ -362,6 +363,8 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingWrite) {
EXPECT_THAT(finish, StatusIs(StatusCode::kAborted));

auto spans = span_catcher->GetSpans();
// Start() return false, the metadata will not be extracted when
// ending the span, so the span will not contain `grpc.peer` in the end.
EXPECT_THAT(
spans,
Contains(AllOf(
Expand All @@ -370,7 +373,6 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingWrite) {
"google.test.admin.database.v1.GoldenKitchenSink/StreamingWrite"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail"),
SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _),
OTelAttribute<std::string>("gl-cpp.status_code", kErrorCode)))));
}

Expand Down Expand Up @@ -400,6 +402,8 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingReadWrite) {
EXPECT_THAT(finish, StatusIs(StatusCode::kAborted));

auto spans = span_catcher->GetSpans();
// Start() return false, the metadata will not be extracted when
// ending the span, so the span will not contain `grpc.peer` in the end.
EXPECT_THAT(
spans,
Contains(AllOf(
Expand All @@ -408,7 +412,6 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingReadWrite) {
"StreamingReadWrite"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail"),
SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _),
OTelAttribute<std::string>("gl-cpp.status_code", kErrorCode)))));
}

Expand Down
8 changes: 7 additions & 1 deletion google/cloud/internal/async_read_write_stream_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AsyncStreamingReadWriteRpcTracing
EndSpan(*ss);
auto started = f.get();
span_->SetAttribute("gl-cpp.stream_started", started);
started_ = started;
return started;
});
}
Expand Down Expand Up @@ -112,14 +113,19 @@ class AsyncStreamingReadWriteRpcTracing
private:
Status End(Status status) {
if (!context_) return status;
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved
return EndSpan(*std::move(context_), *std::move(span_), std::move(status));
if (started_) {
return EndSpan(*std::move(context_), *std::move(span_),
std::move(status));
}
return EndSpan(*std::move(span_), std::move(status));
}

std::shared_ptr<grpc::ClientContext> context_;
std::unique_ptr<AsyncStreamingReadWriteRpc<Request, Response>> impl_;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
int read_count_ = 0;
int write_count_ = 0;
bool started_ = false;
};

} // namespace internal
Expand Down
65 changes: 63 additions & 2 deletions google/cloud/internal/async_read_write_stream_tracing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ TEST(AsyncStreamingReadWriteRpcTracing, WritesDone) {
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadWriteRpcTracing, Finish) {
TEST(AsyncStreamingReadWriteRpcTracing, FinishWithoutStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
Expand All @@ -272,6 +272,31 @@ TEST(AsyncStreamingReadWriteRpcTracing, Finish) {
TestedStream stream(context(), std::move(mock), span);
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

// The stream is not started, the metadata will not be extracted when
// ending the span, so the span will not contain `grpc.peer` in the end.
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved
auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans,
testing::UnorderedElementsAre(
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved
AllOf(SpanNamed("span"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError,
"fail")),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadWriteRpcTracing, FinishWithStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();

EXPECT_CALL(*mock, Start).WillOnce([] { return make_ready_future(true); });
EXPECT_CALL(*mock, Finish)
.WillOnce(Return(make_ready_future(internal::AbortedError("fail"))));

TestedStream stream(context(), std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Expand All @@ -280,7 +305,8 @@ TEST(AsyncStreamingReadWriteRpcTracing, Finish) {
SpanNamed("span"),
SpanHasAttributes(OTelAttribute<std::string>("grpc.peer", _)),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail")),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
AllOf(SpanNamed("Finish"), SpanWithParent(span)),
AllOf(SpanNamed("Start"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadWriteRpcTracing, GetRequestMetadata) {
Expand Down Expand Up @@ -321,6 +347,41 @@ TEST(AsyncStreamingReadWriteRpcTracing, SpanEndsOnDestruction) {
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadWriteRpcTracing,
UnstartedStreamShouldNotExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();

{
auto mock = std::make_unique<MockStream>();
auto span = MakeSpan("span");
auto context = std::make_shared<grpc::ClientContext>();
TestedStream stream(context, std::move(mock), span);
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadWriteRpcTracing, StartedStreamShouldExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();
{
auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();
auto context = std::make_shared<grpc::ClientContext>();
EXPECT_CALL(*mock, Start).WillOnce([context] {
SetServerMetadata(*context, {});
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved
return make_ready_future(true);
});

TestedStream stream(context, std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, testing::UnorderedElementsAre(AllOf(SpanNamed("Start")),
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved
AllOf(SpanNamed("span"))));
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
8 changes: 7 additions & 1 deletion google/cloud/internal/async_streaming_read_rpc_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class AsyncStreamingReadRpcTracing : public AsyncStreamingReadRpc<Response> {
EndSpan(*ss);
auto started = f.get();
span_->SetAttribute("gl-cpp.stream_started", started);
started_ = started;
return started;
});
}
Expand Down Expand Up @@ -89,13 +90,18 @@ class AsyncStreamingReadRpcTracing : public AsyncStreamingReadRpc<Response> {
private:
Status End(Status status) {
if (!context_) return status;
return EndSpan(*std::move(context_), *std::move(span_), std::move(status));
if (started_) {
return EndSpan(*std::move(context_), *std::move(span_),
std::move(status));
}
return EndSpan(*std::move(span_), std::move(status));
}

std::shared_ptr<grpc::ClientContext> context_;
std::unique_ptr<AsyncStreamingReadRpc<Response>> impl_;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
int read_count_ = 0;
bool started_ = false;
};

} // namespace internal
Expand Down
63 changes: 61 additions & 2 deletions google/cloud/internal/async_streaming_read_rpc_tracing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TEST(AsyncStreamingReadRpcTracing, Read) {
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadRpcTracing, Finish) {
TEST(AsyncStreamingReadRpcTracing, FinishWithoutStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
Expand All @@ -161,6 +161,30 @@ TEST(AsyncStreamingReadRpcTracing, Finish) {
TestedStream stream(context(), std::move(mock), span);
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

// The stream is not started, the metadata will not be extracted when
// ending the span, so the span will not contain `grpc.peer` in the end.
auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans,
testing::UnorderedElementsAre(
AllOf(SpanNamed("span"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError,
"fail")),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadRpcTracing, FinishWithStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();
EXPECT_CALL(*mock, Start).WillOnce([] { return make_ready_future(true); });
EXPECT_CALL(*mock, Finish)
.WillOnce(Return(make_ready_future(internal::AbortedError("fail"))));

TestedStream stream(context(), std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Expand All @@ -169,7 +193,8 @@ TEST(AsyncStreamingReadRpcTracing, Finish) {
SpanNamed("span"),
SpanHasAttributes(OTelAttribute<std::string>("grpc.peer", _)),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail")),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
AllOf(SpanNamed("Finish"), SpanWithParent(span)),
AllOf(SpanNamed("Start"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadRpcTracing, GetRequestMetadata) {
Expand Down Expand Up @@ -199,6 +224,40 @@ TEST(AsyncStreamingReadRpcTracing, SpanEndsOnDestruction) {
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadRpcTracing, UnstartedStreamShouldNotExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();

{
auto mock = std::make_unique<MockStream>();
auto span = MakeSpan("span");
auto context = std::make_shared<grpc::ClientContext>();
TestedStream stream(context, std::move(mock), span);
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadRpcTracing, StartedStreamShouldExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();
{
auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();
auto context = std::make_shared<grpc::ClientContext>();
EXPECT_CALL(*mock, Start).WillOnce([context] {
SetServerMetadata(*context, {});
return make_ready_future(true);
});

TestedStream stream(context, std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, testing::UnorderedElementsAre(AllOf(SpanNamed("Start")),
AllOf(SpanNamed("span"))));
}

} // namespace
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
22 changes: 13 additions & 9 deletions google/cloud/internal/async_streaming_write_rpc_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ class AsyncStreamingWriteRpcTracing
: context_(std::move(context)),
impl_(std::move(impl)),
span_(std::move(span)) {}
~AsyncStreamingWriteRpcTracing() override {
if (context_) {
(void)EndSpan(*std::move(context_), *std::move(span_), Status());
}
}
~AsyncStreamingWriteRpcTracing() override { (void)End(StatusOr<Response>()); }
cuiy0006 marked this conversation as resolved.
Show resolved Hide resolved

void Cancel() override {
span_->AddEvent("gl-cpp.cancel");
Expand All @@ -58,6 +54,7 @@ class AsyncStreamingWriteRpcTracing
EndSpan(*ss);
auto started = f.get();
span_->SetAttribute("gl-cpp.stream_started", started);
started_ = started;
return started;
});
}
Expand Down Expand Up @@ -94,10 +91,7 @@ class AsyncStreamingWriteRpcTracing
return impl_->Finish().then(
[this, fs = std::move(finish_span)](future<StatusOr<Response>> f) {
EndSpan(*fs);
auto response = f.get();
if (!context_) return response;
return EndSpan(*std::move(context_), *std::move(span_),
std::move(response));
return End(f.get());
});
}

Expand All @@ -106,10 +100,20 @@ class AsyncStreamingWriteRpcTracing
}

private:
StatusOr<Response> End(StatusOr<Response> status) {
if (!context_) return status;
if (started_) {
return EndSpan(*std::move(context_), *std::move(span_),
std::move(status));
}
return EndSpan(*std::move(span_), std::move(status));
}

std::shared_ptr<grpc::ClientContext> context_;
std::unique_ptr<AsyncStreamingWriteRpc<Request, Response>> impl_;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
int write_count_ = 0;
bool started_ = false;
};

} // namespace internal
Expand Down
Loading
Loading