diff --git a/api/bazel/repositories.bzl b/api/bazel/repositories.bzl index 983f15967b28..1155b1d0cce2 100644 --- a/api/bazel/repositories.bzl +++ b/api/bazel/repositories.bzl @@ -44,6 +44,10 @@ def api_dependencies(): name = "com_github_apache_skywalking_data_collect_protocol", build_file_content = SKYWALKING_DATA_COLLECT_PROTOCOL_BUILD_CONTENT, ) + external_http_archive( + name = "opentelemetry_proto", + build_file_content = OPENTELEMETRY_LOGS_BUILD_CONTENT, + ) PROMETHEUSMETRICS_BUILD_CONTENT = """ load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library") @@ -132,3 +136,25 @@ go_proto_library( visibility = ["//visibility:public"], ) """ + +OPENTELEMETRY_LOGS_BUILD_CONTENT = """ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_cc//cc:defs.bzl", "cc_proto_library") + +proto_library( + name = "logs", + srcs = [ + "opentelemetry/proto/collector/logs/v1/logs_service.proto", + "opentelemetry/proto/common/v1/common.proto", + "opentelemetry/proto/logs/v1/logs.proto", + "opentelemetry/proto/resource/v1/resource.proto", + ], + visibility = ["//visibility:public"], +) + +cc_proto_library( + name = "logs_cc_proto", + deps = [":logs"], + visibility = ["//visibility:public"], +) +""" diff --git a/api/bazel/repository_locations.bzl b/api/bazel/repository_locations.bzl index d15978e7dd6c..903dd8a85ba8 100644 --- a/api/bazel/repository_locations.bzl +++ b/api/bazel/repository_locations.bzl @@ -107,4 +107,15 @@ REPOSITORY_LOCATIONS_SPEC = dict( release_date = "2020-07-29", use_category = ["api"], ), + opentelemetry_proto = dict( + project_name = "OpenTelemetry Proto", + project_desc = "Language Independent Interface Types For OpenTelemetry", + project_url = "https://github.com/open-telemetry/opentelemetry-proto", + version = "0.6.0", + sha256 = "08f090570e0a112bfae276ba37e9c45bf724b64d902a7a001db33123b840ebd6", + strip_prefix = "opentelemetry-proto-{version}", + urls = ["https://github.com/open-telemetry/opentelemetry-proto/archive/v{version}.tar.gz"], + release_date = "2020-10-29", + use_category = ["api"], + ), ) diff --git a/generated_api_shadow/bazel/repositories.bzl b/generated_api_shadow/bazel/repositories.bzl index 983f15967b28..1155b1d0cce2 100644 --- a/generated_api_shadow/bazel/repositories.bzl +++ b/generated_api_shadow/bazel/repositories.bzl @@ -44,6 +44,10 @@ def api_dependencies(): name = "com_github_apache_skywalking_data_collect_protocol", build_file_content = SKYWALKING_DATA_COLLECT_PROTOCOL_BUILD_CONTENT, ) + external_http_archive( + name = "opentelemetry_proto", + build_file_content = OPENTELEMETRY_LOGS_BUILD_CONTENT, + ) PROMETHEUSMETRICS_BUILD_CONTENT = """ load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library") @@ -132,3 +136,25 @@ go_proto_library( visibility = ["//visibility:public"], ) """ + +OPENTELEMETRY_LOGS_BUILD_CONTENT = """ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_cc//cc:defs.bzl", "cc_proto_library") + +proto_library( + name = "logs", + srcs = [ + "opentelemetry/proto/collector/logs/v1/logs_service.proto", + "opentelemetry/proto/common/v1/common.proto", + "opentelemetry/proto/logs/v1/logs.proto", + "opentelemetry/proto/resource/v1/resource.proto", + ], + visibility = ["//visibility:public"], +) + +cc_proto_library( + name = "logs_cc_proto", + deps = [":logs"], + visibility = ["//visibility:public"], +) +""" diff --git a/generated_api_shadow/bazel/repository_locations.bzl b/generated_api_shadow/bazel/repository_locations.bzl index d15978e7dd6c..903dd8a85ba8 100644 --- a/generated_api_shadow/bazel/repository_locations.bzl +++ b/generated_api_shadow/bazel/repository_locations.bzl @@ -107,4 +107,15 @@ REPOSITORY_LOCATIONS_SPEC = dict( release_date = "2020-07-29", use_category = ["api"], ), + opentelemetry_proto = dict( + project_name = "OpenTelemetry Proto", + project_desc = "Language Independent Interface Types For OpenTelemetry", + project_url = "https://github.com/open-telemetry/opentelemetry-proto", + version = "0.6.0", + sha256 = "08f090570e0a112bfae276ba37e9c45bf724b64d902a7a001db33123b840ebd6", + strip_prefix = "opentelemetry-proto-{version}", + urls = ["https://github.com/open-telemetry/opentelemetry-proto/archive/v{version}.tar.gz"], + release_date = "2020-10-29", + use_category = ["api"], + ), ) diff --git a/source/common/formatter/substitution_formatter.cc b/source/common/formatter/substitution_formatter.cc index 2f4737bf3fb4..be4a0130143e 100644 --- a/source/common/formatter/substitution_formatter.cc +++ b/source/common/formatter/substitution_formatter.cc @@ -49,6 +49,9 @@ const std::regex& getSystemTimeFormatNewlinePattern() { } const std::regex& getNewlinePattern() { CONSTRUCT_ON_FIRST_USE(std::regex, "\n"); } +template struct StructFormatMapVisitor : Ts... { using Ts::operator()...; }; +template StructFormatMapVisitor(Ts...) -> StructFormatMapVisitor; + } // namespace const std::string SubstitutionFormatUtils::DEFAULT_FORMAT = @@ -145,56 +148,20 @@ std::string JsonFormatterImpl::format(const Http::RequestHeaderMap& request_head return absl::StrCat(log_line, "\n"); } -StructFormatter::StructFormatter(const ProtobufWkt::Struct& format_mapping, bool preserve_types, - bool omit_empty_values) - : omit_empty_values_(omit_empty_values), preserve_types_(preserve_types), - empty_value_(omit_empty_values_ ? EMPTY_STRING : DefaultUnspecifiedValueString), - struct_output_format_(toFormatMapValue(format_mapping)) {} - StructFormatter::StructFormatMapWrapper -StructFormatter::toFormatMapValue(const ProtobufWkt::Struct& struct_format) const { +StructFormatter::toFormatMap(const ProtobufWkt::Struct& struct_format) const { auto output = std::make_unique(); for (const auto& pair : struct_format.fields()) { switch (pair.second.kind_case()) { case ProtobufWkt::Value::kStringValue: - output->emplace(pair.first, toFormatStringValue(pair.second.string_value())); - break; - - case ProtobufWkt::Value::kStructValue: - output->emplace(pair.first, toFormatMapValue(pair.second.struct_value())); - break; - - case ProtobufWkt::Value::kListValue: - output->emplace(pair.first, toFormatListValue(pair.second.list_value())); - break; - - default: - throw EnvoyException("Only string values, nested structs and list values are " - "supported in structured access log format."); - } - } - return {std::move(output)}; -}; - -StructFormatter::StructFormatListWrapper -StructFormatter::toFormatListValue(const ProtobufWkt::ListValue& list_value_format) const { - auto output = std::make_unique(); - for (const auto& value : list_value_format.values()) { - switch (value.kind_case()) { - case ProtobufWkt::Value::kStringValue: - output->emplace_back(toFormatStringValue(value.string_value())); + output->emplace(pair.first, SubstitutionFormatParser::parse(pair.second.string_value())); break; - case ProtobufWkt::Value::kStructValue: - output->emplace_back(toFormatMapValue(value.struct_value())); - break; - - case ProtobufWkt::Value::kListValue: - output->emplace_back(toFormatListValue(value.list_value())); + output->emplace(pair.first, toFormatMap(pair.second.struct_value())); break; default: - throw EnvoyException("Only string values, nested structs and list values are " - "supported in structured access log format."); + throw EnvoyException( + "Only string values or nested structs are supported in structured access log format."); } } return {std::move(output)}; @@ -205,85 +172,57 @@ StructFormatter::toFormatStringValue(const std::string& string_format) const { return SubstitutionFormatParser::parse(string_format); }; -ProtobufWkt::Value StructFormatter::providersCallback( - const std::vector& providers, - const Http::RequestHeaderMap& request_headers, const Http::ResponseHeaderMap& response_headers, - const Http::ResponseTrailerMap& response_trailers, const StreamInfo::StreamInfo& stream_info, - absl::string_view local_reply_body) const { - ASSERT(!providers.empty()); - if (providers.size() == 1) { - const auto& provider = providers.front(); - if (preserve_types_) { - return provider->formatValue(request_headers, response_headers, response_trailers, - stream_info, local_reply_body); - } - - if (omit_empty_values_) { - return ValueUtil::optionalStringValue(provider->format( - request_headers, response_headers, response_trailers, stream_info, local_reply_body)); - } - - const auto str = provider->format(request_headers, response_headers, response_trailers, - stream_info, local_reply_body); - return ValueUtil::stringValue(str.value_or(DefaultUnspecifiedValueString)); - } - // Multiple providers forces string output. - std::string str; - for (const auto& provider : providers) { - const auto bit = provider->format(request_headers, response_headers, response_trailers, - stream_info, local_reply_body); - str += bit.value_or(empty_value_); - } - return ValueUtil::stringValue(str); -} - -ProtobufWkt::Value StructFormatter::structFormatMapCallback( - const StructFormatter::StructFormatMapWrapper& format_map, - const StructFormatter::StructFormatMapVisitor& visitor) const { - ProtobufWkt::Struct output; - auto* fields = output.mutable_fields(); - for (const auto& pair : *format_map.value_) { - ProtobufWkt::Value value = absl::visit(visitor, pair.second); - if (omit_empty_values_ && value.kind_case() == ProtobufWkt::Value::kNullValue) { - continue; - } - (*fields)[pair.first] = value; - } - return ValueUtil::structValue(output); -} - -ProtobufWkt::Value StructFormatter::structFormatListCallback( - const StructFormatter::StructFormatListWrapper& format_list, - const StructFormatter::StructFormatMapVisitor& visitor) const { - std::vector output; - for (const auto& val : *format_list.value_) { - ProtobufWkt::Value value = absl::visit(visitor, val); - if (omit_empty_values_ && value.kind_case() == ProtobufWkt::Value::kNullValue) { - continue; - } - output.push_back(value); - } - return ValueUtil::listValue(output); -} - ProtobufWkt::Struct StructFormatter::format(const Http::RequestHeaderMap& request_headers, const Http::ResponseHeaderMap& response_headers, const Http::ResponseTrailerMap& response_trailers, const StreamInfo::StreamInfo& stream_info, absl::string_view local_reply_body) const { - StructFormatMapVisitor visitor{ - [&](const std::vector& providers) { - return providersCallback(providers, request_headers, response_headers, response_trailers, - stream_info, local_reply_body); - }, - [&, this](const StructFormatter::StructFormatMapWrapper& format_map) { - return structFormatMapCallback(format_map, visitor); - }, - [&, this](const StructFormatter::StructFormatListWrapper& format_list) { - return structFormatListCallback(format_list, visitor); - }, - }; - return structFormatMapCallback(struct_output_format_, visitor).struct_value(); + const std::string& empty_value = + omit_empty_values_ ? EMPTY_STRING : DefaultUnspecifiedValueString; + const std::function&)> + providers_callback = [&](const std::vector& providers) { + ASSERT(!providers.empty()); + if (providers.size() == 1) { + const auto& provider = providers.front(); + if (preserve_types_) { + return provider->formatValue(request_headers, response_headers, response_trailers, + stream_info, local_reply_body); + } + + if (omit_empty_values_) { + return ValueUtil::optionalStringValue( + provider->format(request_headers, response_headers, response_trailers, stream_info, + local_reply_body)); + } + + const auto str = provider->format(request_headers, response_headers, response_trailers, + stream_info, local_reply_body); + return ValueUtil::stringValue(str.value_or(DefaultUnspecifiedValueString)); + } + // Multiple providers forces string output. + std::string str; + for (const auto& provider : providers) { + const auto bit = provider->format(request_headers, response_headers, response_trailers, + stream_info, local_reply_body); + str += bit.value_or(empty_value); + } + return ValueUtil::stringValue(str); + }; + const std::function + struct_format_map_callback = [&](const StructFormatter::StructFormatMapWrapper& format) { + ProtobufWkt::Struct output; + auto* fields = output.mutable_fields(); + StructFormatMapVisitor visitor{struct_format_map_callback, providers_callback}; + for (const auto& pair : *format.value_) { + ProtobufWkt::Value value = absl::visit(visitor, pair.second); + if (omit_empty_values_ && value.kind_case() == ProtobufWkt::Value::kNullValue) { + continue; + } + (*fields)[pair.first] = value; + } + return ValueUtil::structValue(output); + }; + return struct_format_map_callback(struct_output_format_).struct_value(); } void SubstitutionFormatParser::parseCommandHeader(const std::string& token, const size_t start, @@ -1141,8 +1080,8 @@ MetadataFormatter::formatMetadataValue(const envoy::config::core::v3::Metadata& return val; } -// TODO(glicht): Consider adding support for route/listener/cluster metadata as suggested by -// @htuch. See: https://github.com/envoyproxy/envoy/issues/3006 +// TODO(glicht): Consider adding support for route/listener/cluster metadata as suggested by @htuch. +// See: https://github.com/envoyproxy/envoy/issues/3006 DynamicMetadataFormatter::DynamicMetadataFormatter(const std::string& filter_namespace, const std::vector& path, absl::optional max_length) diff --git a/source/common/formatter/substitution_formatter.h b/source/common/formatter/substitution_formatter.h index 19d6b68f19fb..4d53c03374f6 100644 --- a/source/common/formatter/substitution_formatter.h +++ b/source/common/formatter/substitution_formatter.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -123,10 +122,6 @@ class FormatterImpl : public Formatter { std::vector providers_; }; -// Helper classes for StructFormatter::StructFormatMapVisitor. -template struct StructFormatMapVisitorHelper : Ts... { using Ts::operator()...; }; -template StructFormatMapVisitorHelper(Ts...) -> StructFormatMapVisitorHelper; - /** * An formatter for structured log formats, which returns a Struct proto that * can be converted easily into multiple formats. @@ -134,7 +129,9 @@ template StructFormatMapVisitorHelper(Ts...) -> StructFormatMapVis class StructFormatter { public: StructFormatter(const ProtobufWkt::Struct& format_mapping, bool preserve_types, - bool omit_empty_values); + bool omit_empty_values) + : omit_empty_values_(omit_empty_values), preserve_types_(preserve_types), + struct_output_format_(toFormatMap(format_mapping)) {} ProtobufWkt::Struct format(const Http::RequestHeaderMap& request_headers, const Http::ResponseHeaderMap& response_headers, @@ -144,55 +141,22 @@ class StructFormatter { private: struct StructFormatMapWrapper; - struct StructFormatListWrapper; - using StructFormatValue = - absl::variant, const StructFormatMapWrapper, - const StructFormatListWrapper>; + using StructFormatMapValue = + absl::variant, const StructFormatMapWrapper>; // Although not required for Struct/JSON, it is nice to have the order of // properties preserved between the format and the log entry, thus std::map. - using StructFormatMap = std::map; + using StructFormatMap = std::map; using StructFormatMapPtr = std::unique_ptr; struct StructFormatMapWrapper { StructFormatMapPtr value_; }; - using StructFormatList = std::list; - using StructFormatListPtr = std::unique_ptr; - struct StructFormatListWrapper { - StructFormatListPtr value_; - }; - - using StructFormatMapVisitor = StructFormatMapVisitorHelper< - const std::function&)>, - const std::function, - const std::function>; - - // Methods for building the format map. - std::vector toFormatStringValue(const std::string& string_format) const; - StructFormatMapWrapper toFormatMapValue(const ProtobufWkt::Struct& struct_format) const; - StructFormatListWrapper toFormatListValue(const ProtobufWkt::ListValue& list_value_format) const; - - // Methods for doing the actual formatting. - ProtobufWkt::Value providersCallback(const std::vector& providers, - const Http::RequestHeaderMap& request_headers, - const Http::ResponseHeaderMap& response_headers, - const Http::ResponseTrailerMap& response_trailers, - const StreamInfo::StreamInfo& stream_info, - absl::string_view local_reply_body) const; - ProtobufWkt::Value - structFormatMapCallback(const StructFormatter::StructFormatMapWrapper& format_map, - const StructFormatMapVisitor& visitor) const; - ProtobufWkt::Value - structFormatListCallback(const StructFormatter::StructFormatListWrapper& format_list, - const StructFormatMapVisitor& visitor) const; + StructFormatMapWrapper toFormatMap(const ProtobufWkt::Struct& struct_format) const; const bool omit_empty_values_; const bool preserve_types_; - const std::string empty_value_; const StructFormatMapWrapper struct_output_format_; -}; // namespace Formatter - -using StructFormatterPtr = std::unique_ptr; +}; class JsonFormatterImpl : public Formatter { public: diff --git a/source/extensions/access_loggers/common/grpc_access_logger.h b/source/extensions/access_loggers/common/grpc_access_logger.h index 8598bfbaeb57..f28b50f786be 100644 --- a/source/extensions/access_loggers/common/grpc_access_logger.h +++ b/source/extensions/access_loggers/common/grpc_access_logger.h @@ -219,6 +219,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger>>>>>> a9cbacc16 (generalizing gRPC access logger base class (GrpcAccessLoggel)) } const std::chrono::milliseconds buffer_flush_interval_msec_; diff --git a/source/extensions/access_loggers/grpc/BUILD b/source/extensions/access_loggers/grpc/BUILD index b958ccba7995..b9709779889f 100644 --- a/source/extensions/access_loggers/grpc/BUILD +++ b/source/extensions/access_loggers/grpc/BUILD @@ -24,6 +24,24 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "grpc_ot_access_log_lib", + srcs = ["grpc_ot_access_log_impl.cc"], + hdrs = ["grpc_ot_access_log_impl.h"], + deps = [ + "//include/envoy/event:dispatcher_interface", + "//include/envoy/grpc:async_client_manager_interface", + "//include/envoy/local_info:local_info_interface", + "//include/envoy/thread_local:thread_local_interface", + "//source/common/config:utility_lib", + "//source/common/grpc:typed_async_client_lib", + "//source/extensions/access_loggers/common:grpc_access_logger", + "@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto", + "@opentelemetry_proto//:logs_cc_proto", + ], +) + + envoy_cc_library( name = "grpc_access_log_lib", srcs = ["grpc_access_log_impl.cc"], diff --git a/source/extensions/access_loggers/grpc/grpc_ot_access_log_impl.cc b/source/extensions/access_loggers/grpc/grpc_ot_access_log_impl.cc new file mode 100644 index 000000000000..8132733393d1 --- /dev/null +++ b/source/extensions/access_loggers/grpc/grpc_ot_access_log_impl.cc @@ -0,0 +1,96 @@ +#include "extensions/access_loggers/grpc/grpc_ot_access_log_impl.h" + +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/logs/v1/logs.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" +#include "envoy/grpc/async_client_manager.h" +#include "envoy/local_info/local_info.h" + +#include "common/config/utility.h" +#include "common/grpc/typed_async_client.h" + +const char GRPC_LOG_STATS_PREFIX[] = "access_logs.grpc_ot_access_log."; + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace GrpcCommon { + +GrpcOpenTelemetryAccessLoggerImpl::GrpcOpenTelemetryAccessLoggerImpl( + Grpc::RawAsyncClientPtr&& client, std::string log_name, + std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, + Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope, + envoy::config::core::v3::ApiVersion transport_api_version) + : GrpcAccessLogger( + std::move(client), buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher, scope, + GRPC_LOG_STATS_PREFIX, + Grpc::VersionedMethods("opentelemetry.proto.collector.logs.v1.LogsService.Export", + "opentelemetry.proto.collector.logs.v1.LogsService.Export") + .getMethodDescriptorForVersion(transport_api_version), + transport_api_version) { + initMessageRoot(log_name, local_info); +} + +namespace { + +opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key, + const std::string& value) { + opentelemetry::proto::common::v1::KeyValue keyValue; + keyValue.set_key(key); + keyValue.mutable_value()->set_string_value(value); + return keyValue; +} + +} // namespace + +// See comment about the structure of repeated fields in the header file. +// TODO(itamarkam): allow user configurable attributes. +void GrpcOpenTelemetryAccessLoggerImpl::initMessageRoot(const std::string& log_name, + const LocalInfo::LocalInfo& local_info) { + auto* resource_logs = message_.add_resource_logs(); + root_ = resource_logs->add_instrumentation_library_logs(); + auto* resource = resource_logs->mutable_resource(); + *resource->add_attributes() = getStringKeyValue("log_name", log_name); + *resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName()); + *resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName()); + *resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName()); +} + +void GrpcOpenTelemetryAccessLoggerImpl::addEntry( + opentelemetry::proto::logs::v1::LogRecord&& entry) { + root_->mutable_logs()->Add(std::move(entry)); +} + +void GrpcOpenTelemetryAccessLoggerImpl::addEntry( + opentelemetry::proto::logs::v1::ResourceLogs&& entry) { + (void)entry; +} + +bool GrpcOpenTelemetryAccessLoggerImpl::isEmpty() { return root_->logs().empty(); } + +// The message is already initialized in the c'tor, and only the logs are cleared. +void GrpcOpenTelemetryAccessLoggerImpl::initMessage() {} + +void GrpcOpenTelemetryAccessLoggerImpl::clearMessage() { root_->clear_logs(); } + +GrpcOpenTelemetryAccessLoggerCacheImpl::GrpcOpenTelemetryAccessLoggerCacheImpl( + Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope, + ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info) + : GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {} + +GrpcOpenTelemetryAccessLoggerImpl::SharedPtr GrpcOpenTelemetryAccessLoggerCacheImpl::createLogger( + const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, + Grpc::RawAsyncClientPtr&& client, std::chrono::milliseconds buffer_flush_interval_msec, + uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher, Stats::Scope& scope) { + return std::make_shared( + std::move(client), config.log_name(), buffer_flush_interval_msec, max_buffer_size_bytes, + dispatcher, local_info_, scope, Config::Utility::getAndCheckTransportVersion(config)); +} + +} // namespace GrpcCommon +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/grpc_ot_access_log_impl.h b/source/extensions/access_loggers/grpc/grpc_ot_access_log_impl.h new file mode 100644 index 000000000000..4255b94fc963 --- /dev/null +++ b/source/extensions/access_loggers/grpc/grpc_ot_access_log_impl.h @@ -0,0 +1,85 @@ +#pragma once + +#include + +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/logs/v1/logs.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +#include "envoy/event/dispatcher.h" +#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" +#include "envoy/grpc/async_client_manager.h" +#include "envoy/local_info/local_info.h" +#include "envoy/thread_local/thread_local.h" + +#include "extensions/access_loggers/common/grpc_access_logger.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace GrpcCommon { + +// Note: OpenTelemetry protos are extra flexible and used also in the OT collector for batching and +// so forth. As a result, some fields are repeated, but for our use case we assume the following +// structure: +// ExportLogsServiceRequest -> (single) ResourceLogs -> (single) InstrumentationLibrary -> +// (repeated) LogRecord. +class GrpcOpenTelemetryAccessLoggerImpl + : public Common::GrpcAccessLogger< + opentelemetry::proto::logs::v1::LogRecord, + opentelemetry::proto::logs::v1::ResourceLogs /*TCP*/, + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest, + opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> { +public: + GrpcOpenTelemetryAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std::string log_name, + std::chrono::milliseconds buffer_flush_interval_msec, + uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher, + const LocalInfo::LocalInfo& local_info, Stats::Scope& scope, + envoy::config::core::v3::ApiVersion transport_api_version); + +private: + void initMessageRoot(const std::string& log_name, const LocalInfo::LocalInfo& local_info); + // Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger + void addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) override; + void addEntry(opentelemetry::proto::logs::v1::ResourceLogs&& entry) override; + bool isEmpty() override; + void initMessage() override; + void clearMessage() override; + + opentelemetry::proto::logs::v1::InstrumentationLibraryLogs* root_; +}; + +class GrpcOpenTelemetryAccessLoggerCacheImpl + : public Common::GrpcAccessLoggerCache< + GrpcOpenTelemetryAccessLoggerImpl, + envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig> { +public: + GrpcOpenTelemetryAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, + Stats::Scope& scope, ThreadLocal::SlotAllocator& tls, + const LocalInfo::LocalInfo& local_info); + +private: + // Common::GrpcAccessLoggerCache + GrpcOpenTelemetryAccessLoggerImpl::SharedPtr + createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, + Grpc::RawAsyncClientPtr&& client, + std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, + Event::Dispatcher& dispatcher, Stats::Scope& scope) override; + + const LocalInfo::LocalInfo& local_info_; +}; + +/** + * Aliases for class interfaces for mock definitions. + */ +using GrpcAccessLogger = GrpcOpenTelemetryAccessLoggerImpl::Interface; +using GrpcAccessLoggerSharedPtr = GrpcAccessLogger::SharedPtr; + +using GrpcAccessLoggerCache = GrpcOpenTelemetryAccessLoggerCacheImpl::Interface; +using GrpcAccessLoggerCacheSharedPtr = GrpcAccessLoggerCache::SharedPtr; + +} // namespace GrpcCommon +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/common/formatter/substitution_format_string_test.cc b/test/common/formatter/substitution_format_string_test.cc index 37e83fe3caa9..3b7bd29abad8 100644 --- a/test/common/formatter/substitution_format_string_test.cc +++ b/test/common/formatter/substitution_format_string_test.cc @@ -95,8 +95,7 @@ TEST_F(SubstitutionFormatStringUtilsTest, TestInvalidConfigs) { TestUtility::loadFromYaml(yaml, config_); EXPECT_THROW_WITH_MESSAGE( SubstitutionFormatStringUtils::fromProtoConfig(config_, context_.api()), EnvoyException, - "Only string values, nested structs and list values are supported in structured access log " - "format."); + "Only string values or nested structs are supported in structured access log format."); } } diff --git a/test/common/formatter/substitution_formatter_test.cc b/test/common/formatter/substitution_formatter_test.cc index 73b7ce9ecd99..75e9a2c8abdb 100644 --- a/test/common/formatter/substitution_formatter_test.cc +++ b/test/common/formatter/substitution_formatter_test.cc @@ -1660,7 +1660,7 @@ TEST(SubstitutionFormatterTest, StructFormatterPlainStringTest) { expected_json_map); } -TEST(SubstitutionFormatterTest, StructFormatterTypesTest) { +TEST(SubstitutionFormatterTest, StructFormatterNestedObject) { StreamInfo::MockStreamInfo stream_info; Http::TestRequestHeaderMapImpl request_header; Http::TestResponseHeaderMapImpl response_header; @@ -1674,151 +1674,24 @@ TEST(SubstitutionFormatterTest, StructFormatterTypesTest) { ProtobufWkt::Struct key_mapping; TestUtility::loadFromYaml(R"EOF( - string_type: plain_string_value - struct_type: - plain_string: plain_string_value - protocol: '%PROTOCOL%' - list_type: - - plain_string_value - - '%PROTOCOL%' + level_one: + level_two: + level_three: + plain_string: plain_string_value + protocol: '%PROTOCOL%' )EOF", key_mapping); StructFormatter formatter(key_mapping, false, false); const ProtobufWkt::Struct expected = TestUtility::jsonToStruct(R"EOF({ - "string_type": "plain_string_value", - "struct_type": { - "plain_string": "plain_string_value", - "protocol": "HTTP/1.1" - }, - "list_type": [ - "plain_string_value", - "HTTP/1.1" - ] - })EOF"); - const ProtobufWkt::Struct out_struct = - formatter.format(request_header, response_header, response_trailer, stream_info, body); - EXPECT_TRUE(TestUtility::protoEqual(out_struct, expected)); -} - -// Test that nested values are formatted properly, including inter-type nesting. -TEST(SubstitutionFormatterTest, StructFormatterNestedObjectsTest) { - StreamInfo::MockStreamInfo stream_info; - Http::TestRequestHeaderMapImpl request_header; - Http::TestResponseHeaderMapImpl response_header; - Http::TestResponseTrailerMapImpl response_trailer; - std::string body; - - envoy::config::core::v3::Metadata metadata; - populateMetadataTestData(metadata); - absl::optional protocol = Http::Protocol::Http11; - EXPECT_CALL(stream_info, protocol()).WillRepeatedly(Return(protocol)); - - ProtobufWkt::Struct key_mapping; - // For both struct and list, we test 3 nesting levels of all types (string, struct and list). - TestUtility::loadFromYaml(R"EOF( - struct: - struct_string: plain_string_value - struct_protocol: '%PROTOCOL%' - struct_struct: - struct_struct_string: plain_string_value - struct_struct_protocol: '%PROTOCOL%' - struct_struct_struct: - struct_struct_struct_string: plain_string_value - struct_struct_struct_protocol: '%PROTOCOL%' - struct_struct_list: - - struct_struct_list_string - - '%PROTOCOL%' - struct_list: - - struct_list_string - - '%PROTOCOL%' - # struct_list_struct - - struct_list_struct_string: plain_string_value - struct_list_struct_protocol: '%PROTOCOL%' - # struct_list_list - - - struct_list_list_string - - '%PROTOCOL%' - list: - - list_string - - '%PROTOCOL%' - # list_struct - - list_struct_string: plain_string_value - list_struct_protocol: '%PROTOCOL%' - list_struct_struct: - list_struct_struct_string: plain_string_value - list_struct_struct_protocol: '%PROTOCOL%' - list_struct_list: - - list_struct_list_string - - '%PROTOCOL%' - # list_list - - - list_list_string - - '%PROTOCOL%' - # list_list_struct - - list_list_struct_string: plain_string_value - list_list_struct_protocol: '%PROTOCOL%' - # list_list_list - - - list_list_list_string - - '%PROTOCOL%' - )EOF", - key_mapping); - StructFormatter formatter(key_mapping, false, false); - const ProtobufWkt::Struct expected = TestUtility::jsonToStruct(R"EOF({ - "struct": { - "struct_string": "plain_string_value", - "struct_protocol": "HTTP/1.1", - "struct_struct": { - "struct_struct_string": "plain_string_value", - "struct_struct_protocol": "HTTP/1.1", - "struct_struct_struct": { - "struct_struct_struct_string": "plain_string_value", - "struct_struct_struct_protocol": "HTTP/1.1", - }, - "struct_struct_list": [ - "struct_struct_list_string", - "HTTP/1.1", - ], - }, - "struct_list": [ - "struct_list_string", - "HTTP/1.1", - { - "struct_list_struct_string": "plain_string_value", - "struct_list_struct_protocol": "HTTP/1.1", - }, - [ - "struct_list_list_string", - "HTTP/1.1", - ], - ], - }, - "list": [ - "list_string", - "HTTP/1.1", - { - "list_struct_string": "plain_string_value", - "list_struct_protocol": "HTTP/1.1", - "list_struct_struct": { - "list_struct_struct_string": "plain_string_value", - "list_struct_struct_protocol": "HTTP/1.1", - }, - "list_struct_list": [ - "list_struct_list_string", - "HTTP/1.1", - ] - }, - [ - "list_list_string", - "HTTP/1.1", - { - "list_list_struct_string": "plain_string_value", - "list_list_struct_protocol": "HTTP/1.1", - }, - [ - "list_list_list_string", - "HTTP/1.1", - ], - ], - ], + "level_one": { + "level_two": { + "level_three": { + "plain_string": "plain_string_value", + "protocol": "HTTP/1.1" + } + } + } })EOF"); const ProtobufWkt::Struct out_struct = formatter.format(request_header, response_header, response_trailer, stream_info, body); @@ -1899,14 +1772,10 @@ TEST(SubstitutionFormatterTest, StructFormatterAlternateHeaderTest) { ProtobufWkt::Struct key_mapping; TestUtility::loadFromYaml(R"EOF( - request_present_header_or_request_absent_header: - '%REQ(request_present_header?request_absent_header)%' - request_absent_header_or_request_present_header: - '%REQ(request_absent_header?request_present_header)%' - response_absent_header_or_response_absent_header: - '%RESP(response_absent_header?response_present_header)%' - response_present_header_or_response_absent_header: - '%RESP(response_present_header?response_absent_header)%' + request_present_header_or_request_absent_header: '%REQ(request_present_header?request_absent_header)%' + request_absent_header_or_request_present_header: '%REQ(request_absent_header?request_present_header)%' + response_absent_header_or_response_absent_header: '%RESP(response_absent_header?response_present_header)%' + response_present_header_or_response_absent_header: '%RESP(response_present_header?response_absent_header)%' )EOF", key_mapping); StructFormatter formatter(key_mapping, false, false); @@ -2197,8 +2066,7 @@ TEST(SubstitutionFormatterTest, StructFormatterMultiTokenTest) { ProtobufWkt::Struct key_mapping; TestUtility::loadFromYaml(R"EOF( - multi_token_field: '%PROTOCOL% plainstring %REQ(some_request_header)% - %RESP(some_response_header)%' + multi_token_field: '%PROTOCOL% plainstring %REQ(some_request_header)% %RESP(some_response_header)%' )EOF", key_mapping); for (const bool preserve_types : {false, true}) { diff --git a/test/extensions/access_loggers/common/grpc_access_logger_test.cc b/test/extensions/access_loggers/common/grpc_access_logger_test.cc index fc6a37e871df..d93be366601e 100644 --- a/test/extensions/access_loggers/common/grpc_access_logger_test.cc +++ b/test/extensions/access_loggers/common/grpc_access_logger_test.cc @@ -64,6 +64,8 @@ class MockGrpcAccessLoggerImpl int numInits() const { return num_inits_; } + int numClears() const { return num_clears_; } + private: void mockAddEntry(const std::string& key) { if (!message_.fields().contains(key)) { @@ -94,7 +96,13 @@ class MockGrpcAccessLoggerImpl void initMessage() override { ++num_inits_; } + void clearMessage() override { + message_.Clear(); + num_clears_++; + } + int num_inits_ = 0; + int num_clears_ = 0; }; class GrpcAccessLogTest : public testing::Test { @@ -161,12 +169,15 @@ TEST_F(GrpcAccessLogTest, BasicFlow) { expectFlushedLogEntriesCount(stream, MOCK_HTTP_LOG_FIELD_NAME, 1); logger_->log(mockHttpEntry()); EXPECT_EQ(1, logger_->numInits()); + // Messages should be cleared after each flush. + EXPECT_EQ(1, logger_->numClears()); EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); // Log a TCP entry. expectFlushedLogEntriesCount(stream, MOCK_TCP_LOG_FIELD_NAME, 1); logger_->log(ProtobufWkt::Empty()); + EXPECT_EQ(2, logger_->numClears()); // TCP logging doesn't change the logs_written counter. EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); @@ -183,6 +194,7 @@ TEST_F(GrpcAccessLogTest, BasicFlow) { logger_->log(mockHttpEntry()); // Message should be initialized again. EXPECT_EQ(2, logger_->numInits()); + EXPECT_EQ(3, logger_->numClears()); EXPECT_EQ(0, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_dropped")->value()); EXPECT_EQ(2, @@ -202,6 +214,8 @@ TEST_F(GrpcAccessLogTest, WatermarksOverrun) { EXPECT_CALL(stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(true)); EXPECT_CALL(stream, sendMessageRaw_(_, false)).Times(0); logger_->log(mockHttpEntry()); + // No entry was logged so no clear expected. + EXPECT_EQ(0, logger_->numClears()); EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); EXPECT_EQ(0, @@ -212,6 +226,8 @@ TEST_F(GrpcAccessLogTest, WatermarksOverrun) { EXPECT_CALL(stream, sendMessageRaw_(_, _)).Times(0); logger_->log(mockHttpEntry()); EXPECT_EQ(1, logger_->numInits()); + // Still no entry was logged so no clear expected. + EXPECT_EQ(0, logger_->numClears()); EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); EXPECT_EQ(1, @@ -224,12 +240,54 @@ TEST_F(GrpcAccessLogTest, WatermarksOverrun) { EXPECT_CALL(stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); EXPECT_CALL(stream, sendMessageRaw_(_, _)); logger_->log(mockHttpEntry()); + // Now both entries were logged separately so we expect 2 clears. + EXPECT_EQ(2, logger_->numClears()); EXPECT_EQ(2, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); EXPECT_EQ(1, TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_dropped")->value()); } +// Test legacy behavior of unbounded access logs. +TEST_F(GrpcAccessLogTest, WatermarksLegacy) { + TestScopedRuntime scoped_runtime; + Runtime::LoaderSingleton::getExisting()->mergeValues( + {{"envoy.reloadable_features.disallow_unbounded_access_logs", "false"}}); + + initLogger(FlushInterval, 1); + + // Start a stream for the first log. + MockAccessLogStream stream; + AccessLogCallbacks* callbacks; + expectStreamStart(stream, &callbacks); + + EXPECT_CALL(stream, isAboveWriteBufferHighWatermark()) + .Times(AnyNumber()) + .WillRepeatedly(Return(true)); + + // Fail to flush, so the log stays buffered up. + EXPECT_CALL(stream, sendMessageRaw_(_, false)).Times(0); + logger_->log(mockHttpEntry()); + // Message should still be initialized. + EXPECT_EQ(1, logger_->numInits()); + // No entry was logged so no clear expected. + EXPECT_EQ(0, logger_->numClears()); + EXPECT_EQ(1, + TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); + EXPECT_EQ(0, + TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_dropped")->value()); + + // As with the above test, try to log more. The log will not be dropped. + EXPECT_CALL(stream, sendMessageRaw_(_, _)).Times(0); + logger_->log(mockHttpEntry()); + // Still no entries were logged so no clears expected. + EXPECT_EQ(0, logger_->numClears()); + EXPECT_EQ(2, + TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_written")->value()); + EXPECT_EQ(0, + TestUtility::findCounter(stats_store_, "mock_access_log_prefix.logs_dropped")->value()); +} + // Test that stream failure is handled correctly. TEST_F(GrpcAccessLogTest, StreamFailure) { initLogger(FlushInterval, 0); @@ -260,6 +318,8 @@ TEST_F(GrpcAccessLogTest, Batching) { logger_->log(mockHttpEntry()); logger_->log(mockHttpEntry()); EXPECT_EQ(1, logger_->numInits()); + // The entries were batched and logged together so we expect a single clear. + EXPECT_EQ(1, logger_->numClears()); // Logging an entry that's bigger than the buffer size should trigger another flush. expectFlushedLogEntriesCount(stream, MOCK_HTTP_LOG_FIELD_NAME, 1); @@ -267,6 +327,7 @@ TEST_F(GrpcAccessLogTest, Batching) { const std::string big_key(max_buffer_size, 'a'); big_entry.mutable_fields()->insert({big_key, ProtobufWkt::Value()}); logger_->log(std::move(big_entry)); + EXPECT_EQ(2, logger_->numClears()); } // Test that log entries are flushed periodically. diff --git a/test/extensions/access_loggers/grpc/BUILD b/test/extensions/access_loggers/grpc/BUILD index 535fb3408600..903a161afe81 100644 --- a/test/extensions/access_loggers/grpc/BUILD +++ b/test/extensions/access_loggers/grpc/BUILD @@ -112,3 +112,22 @@ envoy_extension_cc_test( "@envoy_api//envoy/service/accesslog/v3:pkg_cc_proto", ], ) + + +envoy_extension_cc_test( + name = "grpc_ot_access_log_impl_test", + srcs = ["grpc_ot_access_log_impl_test.cc"], + extension_name = "envoy.access_loggers.http_grpc", + deps = [ + "//source/common/buffer:zero_copy_input_stream_lib", + "//source/extensions/access_loggers/grpc:grpc_ot_access_log_lib", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/stats:stats_mocks", + "//test/mocks/thread_local:thread_local_mocks", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto", + "@opentelemetry_proto//:logs_cc_proto", + ], +) + diff --git a/test/extensions/access_loggers/grpc/grpc_ot_access_log_impl_test.cc b/test/extensions/access_loggers/grpc/grpc_ot_access_log_impl_test.cc new file mode 100644 index 000000000000..be8fb5f12b56 --- /dev/null +++ b/test/extensions/access_loggers/grpc/grpc_ot_access_log_impl_test.cc @@ -0,0 +1,213 @@ +#include + +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/logs/v1/logs.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +#include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" + +#include "common/buffer/zero_copy_input_stream_impl.h" + +#include "extensions/access_loggers/grpc/grpc_ot_access_log_impl.h" + +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/stats/mocks.h" +#include "test/mocks/thread_local/mocks.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace GrpcCommon { +namespace { + +constexpr std::chrono::milliseconds FlushInterval(10); +constexpr int BUFFER_SIZE_BYTES = 0; +const std::string ZONE_NAME = "zone_name"; +const std::string CLUSTER_NAME = "cluster_name"; +const std::string NODE_NAME = "node_name"; + +// A helper test class to mock and intercept GrpcOpenTelemetryAccessLoggerImpl streams. +class GrpcOpenTelemetryAccessLoggerImplTestHelper { +public: + using MockAccessLogStream = Grpc::MockAsyncStream; + using AccessLogCallbacks = Grpc::AsyncStreamCallbacks< + opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse>; + + GrpcOpenTelemetryAccessLoggerImplTestHelper(LocalInfo::MockLocalInfo& local_info, + Grpc::MockAsyncClient* async_client) { + EXPECT_CALL(local_info, zoneName()).WillOnce(ReturnRef(ZONE_NAME)); + EXPECT_CALL(local_info, clusterName()).WillOnce(ReturnRef(CLUSTER_NAME)); + EXPECT_CALL(local_info, nodeName()).WillOnce(ReturnRef(NODE_NAME)); + EXPECT_CALL(*async_client, startRaw(_, _, _, _)) + .WillOnce( + Invoke([this](absl::string_view, absl::string_view, Grpc::RawAsyncStreamCallbacks& cbs, + const Http::AsyncClient::StreamOptions&) { + this->callbacks_ = dynamic_cast(&cbs); + return &this->stream_; + })); + } + + void expectStreamMessage(const std::string& expected_message_yaml) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest expected_message; + TestUtility::loadFromYaml(expected_message_yaml, expected_message); + EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + EXPECT_CALL(stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([expected_message](Buffer::InstancePtr& request, bool) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message; + Buffer::ZeroCopyInputStreamImpl request_stream(std::move(request)); + EXPECT_TRUE(message.ParseFromZeroCopyStream(&request_stream)); + EXPECT_EQ(message.DebugString(), expected_message.DebugString()); + })); + } + +private: + MockAccessLogStream stream_; + AccessLogCallbacks* callbacks_; +}; + +class GrpcOpenTelemetryAccessLoggerImplTest : public testing::Test { +public: + GrpcOpenTelemetryAccessLoggerImplTest() + : async_client_(new Grpc::MockAsyncClient), timer_(new Event::MockTimer(&dispatcher_)), + grpc_access_logger_impl_test_helper_(local_info_, async_client_) { + EXPECT_CALL(*timer_, enableTimer(_, _)); + logger_ = std::make_unique( + Grpc::RawAsyncClientPtr{async_client_}, "test_log_name", FlushInterval, BUFFER_SIZE_BYTES, + dispatcher_, local_info_, stats_store_, envoy::config::core::v3::ApiVersion::V3); + } + + Grpc::MockAsyncClient* async_client_; + Stats::IsolatedStoreImpl stats_store_; + LocalInfo::MockLocalInfo local_info_; + Event::MockDispatcher dispatcher_; + Event::MockTimer* timer_; + std::unique_ptr logger_; + GrpcOpenTelemetryAccessLoggerImplTestHelper grpc_access_logger_impl_test_helper_; +}; + +TEST_F(GrpcOpenTelemetryAccessLoggerImplTest, LogHttp) { + grpc_access_logger_impl_test_helper_.expectStreamMessage(R"EOF( + resource_logs: + resource: + attributes: + - key: "log_name" + value: + string_value: "test_log_name" + - key: "zone_name" + value: + string_value: "zone_name" + - key: "cluster_name" + value: + string_value: "cluster_name" + - key: "node_name" + value: + string_value: "node_name" + instrumentation_library_logs: + - logs: + - severity_text: "test-severity-text" + )EOF"); + opentelemetry::proto::logs::v1::LogRecord entry; + entry.set_severity_text("test-severity-text"); + logger_->log(opentelemetry::proto::logs::v1::LogRecord(entry)); +} + +TEST_F(GrpcOpenTelemetryAccessLoggerImplTest, LogTcp) { + grpc_access_logger_impl_test_helper_.expectStreamMessage(R"EOF( + resource_logs: + resource: + attributes: + - key: "log_name" + value: + string_value: "test_log_name" + - key: "zone_name" + value: + string_value: "zone_name" + - key: "cluster_name" + value: + string_value: "cluster_name" + - key: "node_name" + value: + string_value: "node_name" + instrumentation_library_logs: + - logs: + - severity_text: "test-severity-text" + )EOF"); + opentelemetry::proto::logs::v1::LogRecord entry; + entry.set_severity_text("test-severity-text"); + logger_->log(opentelemetry::proto::logs::v1::LogRecord(entry)); +} + +class GrpcOpenTelemetryAccessLoggerCacheImplTest : public testing::Test { +public: + GrpcOpenTelemetryAccessLoggerCacheImplTest() + : async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory), + logger_cache_(async_client_manager_, scope_, tls_, local_info_), + grpc_access_logger_impl_test_helper_(local_info_, async_client_) { + EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false)) + .WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + EXPECT_CALL(*factory_, create()).WillOnce(Invoke([this] { + return Grpc::RawAsyncClientPtr{async_client_}; + })); + return Grpc::AsyncClientFactoryPtr{factory_}; + })); + } + + Grpc::MockAsyncClient* async_client_; + Grpc::MockAsyncClientFactory* factory_; + Grpc::MockAsyncClientManager async_client_manager_; + LocalInfo::MockLocalInfo local_info_; + NiceMock scope_; + NiceMock tls_; + GrpcOpenTelemetryAccessLoggerCacheImpl logger_cache_; + GrpcOpenTelemetryAccessLoggerImplTestHelper grpc_access_logger_impl_test_helper_; +}; + +// Test that the logger is created according to the config (by inspecting the generated log). +TEST_F(GrpcOpenTelemetryAccessLoggerCacheImplTest, LoggerCreation) { + envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig config; + config.set_log_name("test-log"); + config.set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); + // Force a flush for every log entry. + config.mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES); + + GrpcAccessLoggerSharedPtr logger = + logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope_); + grpc_access_logger_impl_test_helper_.expectStreamMessage(R"EOF( + resource_logs: + resource: + attributes: + - key: "log_name" + value: + string_value: "test-log" + - key: "zone_name" + value: + string_value: "zone_name" + - key: "cluster_name" + value: + string_value: "cluster_name" + - key: "node_name" + value: + string_value: "node_name" + instrumentation_library_logs: + - logs: + - severity_text: "test-severity-text" + )EOF"); + opentelemetry::proto::logs::v1::LogRecord entry; + entry.set_severity_text("test-severity-text"); + logger->log(opentelemetry::proto::logs::v1::LogRecord(entry)); +} + +} // namespace +} // namespace GrpcCommon +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy