Skip to content

Commit

Permalink
feat(storage): single span for ReadObject() (#14435)
Browse files Browse the repository at this point in the history
Keep the span created for `ReadObject()` open until the download
completes or is interrupted. This also groups resumed downloads and
backoff spans under one umbrella.
  • Loading branch information
coryan committed Jul 8, 2024
1 parent 5e0d8b4 commit d43e267
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 2 deletions.
2 changes: 2 additions & 0 deletions google/cloud/storage/google_cloud_cpp_storage.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ google_cloud_cpp_storage_hdrs = [
"internal/signed_url_requests.h",
"internal/storage_connection.h",
"internal/tracing_connection.h",
"internal/tracing_object_read_source.h",
"internal/tuple_filter.h",
"internal/unified_rest_credentials.h",
"internal/well_known_parameters_impl.h",
Expand Down Expand Up @@ -232,6 +233,7 @@ google_cloud_cpp_storage_srcs = [
"internal/signed_url_requests.cc",
"internal/storage_connection.cc",
"internal/tracing_connection.cc",
"internal/tracing_object_read_source.cc",
"internal/unified_rest_credentials.cc",
"internal/win32/hash_function_impl.cc",
"lifecycle_rule.cc",
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/storage/google_cloud_cpp_storage.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ add_library(
internal/storage_connection.h
internal/tracing_connection.cc
internal/tracing_connection.h
internal/tracing_object_read_source.cc
internal/tracing_object_read_source.h
internal/tuple_filter.h
internal/unified_rest_credentials.cc
internal/unified_rest_credentials.h
Expand Down Expand Up @@ -498,6 +500,7 @@ if (BUILD_TESTING)
internal/sign_blob_requests_test.cc
internal/signed_url_requests_test.cc
internal/tracing_connection_test.cc
internal/tracing_object_read_source_test.cc
internal/tuple_filter_test.cc
internal/unified_rest_credentials_test.cc
lifecycle_rule_test.cc
Expand Down
8 changes: 6 additions & 2 deletions google/cloud/storage/internal/tracing_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/storage/internal/tracing_connection.h"
#include "google/cloud/storage/internal/tracing_object_read_source.h"
#include "google/cloud/internal/opentelemetry.h"

namespace google {
Expand Down Expand Up @@ -128,8 +129,11 @@ TracingConnection::ReadObject(
storage::internal::ReadObjectRangeRequest const& request) {
auto span = internal::MakeSpan("storage::Client::ReadObject");
auto scope = opentelemetry::trace::Scope(span);
// TODO(#11393) - add a wrapper for ReadObjectSource.
return internal::EndSpan(*span, impl_->ReadObject(request));
auto reader = impl_->ReadObject(request);
if (!reader) return internal::EndSpan(*span, std::move(reader));
return std::unique_ptr<storage::internal::ObjectReadSource>(
std::make_unique<TracingObjectReadSource>(std::move(span),
*std::move(reader)));
}

StatusOr<storage::internal::ListObjectsResponse> TracingConnection::ListObjects(
Expand Down
42 changes: 42 additions & 0 deletions google/cloud/storage/internal/tracing_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "google/cloud/storage/internal/tracing_connection.h"
#include "google/cloud/storage/testing/canonical_errors.h"
#include "google/cloud/storage/testing/mock_client.h"
#include "google/cloud/internal/opentelemetry.h"
#include "google/cloud/testing_util/opentelemetry_matchers.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>
Expand All @@ -28,18 +29,21 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ::google::cloud::storage::testing::MockClient;
using ::google::cloud::storage::testing::MockObjectReadSource;
using ::google::cloud::storage::testing::canonical_errors::PermanentError;
using ::google::cloud::testing_util::InstallSpanCatcher;
using ::google::cloud::testing_util::OTelAttribute;
using ::google::cloud::testing_util::SpanHasAttributes;
using ::google::cloud::testing_util::SpanHasInstrumentationScope;
using ::google::cloud::testing_util::SpanIsRoot;
using ::google::cloud::testing_util::SpanKindIsClient;
using ::google::cloud::testing_util::SpanNamed;
using ::google::cloud::testing_util::SpanWithStatus;
using ::google::cloud::testing_util::StatusIs;
using ::google::cloud::testing_util::ThereIsAnActiveSpan;
using ::testing::AllOf;
using ::testing::ElementsAre;
using ::testing::Not;
using ::testing::Return;

TEST(TracingClientTest, Options) {
Expand Down Expand Up @@ -375,6 +379,44 @@ TEST(TracingClientTest, ReadObject) {
"gl-cpp.status_code", code_str)))));
}

TEST(TracingClientTest, ReadObjectPartialSuccess) {
using ::google::cloud::storage::internal::ObjectReadSource;
using ::google::cloud::storage::internal::ReadSourceResult;

auto span_catcher = InstallSpanCatcher();
auto mock = std::make_shared<MockClient>();
EXPECT_CALL(*mock, ReadObject).WillOnce([](auto const&) {
EXPECT_TRUE(ThereIsAnActiveSpan());
internal::EndSpan(*internal::MakeSpan("Read1"));
internal::EndSpan(*internal::MakeSpan("Read2"));
auto source = std::make_unique<MockObjectReadSource>();
EXPECT_CALL(*source, Read)
.WillOnce(Return(ReadSourceResult{}))
.WillOnce(Return(PermanentError()));
return std::unique_ptr<ObjectReadSource>(std::move(source));
});
auto under_test = TracingConnection(mock);
auto actual =
under_test.ReadObject(storage::internal::ReadObjectRangeRequest());
ASSERT_STATUS_OK(actual);
auto reader = *std::move(actual);

auto const code = PermanentError().code();
auto const msg = PermanentError().message();
EXPECT_STATUS_OK(reader->Read(nullptr, 1024));
EXPECT_THAT(reader->Read(nullptr, 1024), StatusIs(code));
EXPECT_THAT(
span_catcher->GetSpans(),
UnorderedElementsAre(
AllOf(SpanNamed("storage::Client::ReadObject"),
SpanHasInstrumentationScope(), SpanKindIsClient(), SpanIsRoot(),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, msg),
SpanHasAttributes(OTelAttribute<std::string>(
"gl-cpp.status_code", StatusCodeToString(code)))),
AllOf(SpanNamed("Read1"), Not(SpanIsRoot())),
AllOf(SpanNamed("Read2"), Not(SpanIsRoot()))));
}

TEST(TracingClientTest, ListObjects) {
auto span_catcher = InstallSpanCatcher();
auto mock = std::make_shared<MockClient>();
Expand Down
83 changes: 83 additions & 0 deletions google/cloud/storage/internal/tracing_object_read_source.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

#include "google/cloud/storage/internal/tracing_object_read_source.h"
#include "google/cloud/internal/opentelemetry.h"
#include "google/cloud/status.h"
#include <opentelemetry/trace/scope.h>
#include <chrono>
#include <utility>

namespace google {
namespace cloud {
namespace storage_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

TracingObjectReadSource::TracingObjectReadSource(
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span,
std::unique_ptr<storage::internal::ObjectReadSource> child)
: span_(std::move(span)), child_(std::move(child)) {}

TracingObjectReadSource::~TracingObjectReadSource() {
// A download may be abandoned by the application, or kept open after all
// the data is received. Note that if there was an unrecoverable error the
// span is already finished in `TracingObjectReadOSource::Read()`.
internal::EndSpan(*span_);
}

bool TracingObjectReadSource::IsOpen() const {
auto scope = opentelemetry::trace::Scope(span_);
return child_->IsOpen();
}

StatusOr<storage::internal::HttpResponse> TracingObjectReadSource::Close() {
auto scope = opentelemetry::trace::Scope(span_);
span_->AddEvent("gl-cpp.close");
return child_->Close();
}

StatusOr<storage::internal::ReadSourceResult> TracingObjectReadSource::Read(
char* buf, std::size_t n) {
auto scope = opentelemetry::trace::Scope(span_);
auto const start = std::chrono::system_clock::now();
auto response = child_->Read(buf, n);
auto const latency = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now() - start)
.count();

if (!response) {
auto const code = StatusCodeToString(response.status().code());
span_->AddEvent("gl-cpp.read",
opentelemetry::common::SystemTimestamp(start),
{{"read.status.code", code},
{"read.buffer.size", static_cast<std::int64_t>(n)},
{"read.latency.us", static_cast<std::int64_t>(latency)}});
return google::cloud::internal::EndSpan(*span_, std::move(response));
}
span_->AddEvent("gl-cpp.read", opentelemetry::common::SystemTimestamp(start),
{{"read.buffer.size", static_cast<std::int64_t>(n)},
{"read.returned.size",
static_cast<std::int64_t>(response->bytes_received)},
{"read.latency.us", static_cast<std::int64_t>(latency)}});
return response;
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
57 changes: 57 additions & 0 deletions google/cloud/storage/internal/tracing_object_read_source.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_OBJECT_READ_SOURCE_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_OBJECT_READ_SOURCE_H

#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

#include "google/cloud/storage/internal/object_read_source.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/internal/opentelemetry.h"
#include <memory>

namespace google {
namespace cloud {
namespace storage_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

/**
*
*/
class TracingObjectReadSource : public storage::internal::ObjectReadSource {
public:
TracingObjectReadSource(
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span,
std::unique_ptr<storage::internal::ObjectReadSource> child);
~TracingObjectReadSource() override;

bool IsOpen() const override;
StatusOr<storage::internal::HttpResponse> Close() override;
StatusOr<storage::internal::ReadSourceResult> Read(char* buf,
std::size_t n) override;

private:
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
std::unique_ptr<storage::internal::ObjectReadSource> child_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_OBJECT_READ_SOURCE_H
Loading

0 comments on commit d43e267

Please sign in to comment.