Skip to content

Commit

Permalink
metrics service sink: add config option to report counters as deltas (#…
Browse files Browse the repository at this point in the history
…10889)

Description: this PR adds the ability to configure the metrics service stats sink to report counters as deltas between flushing intervals. This is the expected representation for some stats aggregations backends. Similar behavior is seen, for instance, in the statsd sink.
Risk Level: low, previous behavior is left unchanged, and is the default.
Testing: updated unit test.
Docs Changes: left comments in the field definition
Release Notes: updated version history.

Signed-off-by: Jose Nino <jnino@lyft.com>
  • Loading branch information
junr03 authored Apr 28, 2020
1 parent ab32f5f commit 048f423
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 39 deletions.
8 changes: 8 additions & 0 deletions api/envoy/config/metrics/v3/metrics_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package envoy.config.metrics.v3;

import "envoy/config/core/v3/grpc_service.proto";

import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
import "validate/validate.proto";
Expand All @@ -25,4 +27,10 @@ message MetricsServiceConfig {

// The upstream gRPC cluster that hosts the metrics service.
core.v3.GrpcService grpc_service = 1 [(validate.rules).message = {required: true}];

// If true, counters are reported as the delta between flushing intervals. Otherwise, the current
// counter value is reported. Defaults to false.
// Eventually (https://github.com/envoyproxy/envoy/issues/10968) if this value is not set, the
// sink will take updates from the :ref:`MetricsResponse <envoy_api_msg_service.metrics.v3.StreamMetricsResponse>`.
google.protobuf.BoolValue report_counters_as_deltas = 2;
}
3 changes: 2 additions & 1 deletion docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Changes
* access loggers: added GRPC_STATUS operator on logging format.
* access loggers: extened specifier for FilterStateFormatter to output :ref:`unstructured log string <config_access_log_format_filter_state>`.
* dynamic forward proxy: added :ref:`SNI based dynamic forward proxy <config_network_filters_sni_dynamic_forward_proxy>` support.
* fault: added support for controlling the percentage of requests that abort, delay and response rate limits faults
* fault: added support for controlling the percentage of requests that abort, delay and response rate limits faults
are applied to using :ref:`HTTP headers <config_http_filters_fault_injection_http_header>` to the HTTP fault filter.
* fault: added support for specifying grpc_status code in abort faults using
:ref:`HTTP header <config_http_filters_fault_injection_http_header>` or abort fault configuration in HTTP fault filter.
Expand All @@ -31,6 +31,7 @@ Changes
tracing is not forced.
* router: allow retries of streaming or incomplete requests. This removes stat `rq_retry_skipped_request_not_complete`.
* router: allow retries by default when upstream responds with :ref:`x-envoy-overloaded <config_http_filters_router_x-envoy-overloaded_set>`.
* stats: added the option to :ref:`report counters as deltas <envoy_v3_api_field_config.metrics.v3.MetricsServiceConfig.report_counters_as_deltas>` to the metrics service stats sink.
* tracing: tracing configuration has been made fully dynamic and every HTTP connection manager
can now have a separate :ref:`tracing provider <envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.Tracing.provider>`.
* upstream: fixed a bug where Envoy would panic when receiving a GRPC SERVICE_UNKNOWN status on the health check.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion source/extensions/stat_sinks/metrics_service/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ Stats::SinkPtr MetricsServiceSinkFactory::createStatsSink(const Protobuf::Messag
grpc_service, server.stats(), false),
server.localInfo());

return std::make_unique<MetricsServiceSink>(grpc_metrics_streamer, server.timeSource());
return std::make_unique<MetricsServiceSink>(
grpc_metrics_streamer, server.timeSource(),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false));
}

ProtobufTypes::MessagePtr MetricsServiceSinkFactory::createEmptyConfigProto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,26 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes
}

MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
TimeSource& time_source)
: grpc_metrics_streamer_(grpc_metrics_streamer), time_source_(time_source) {}
TimeSource& time_source,
const bool report_counters_as_deltas)
: grpc_metrics_streamer_(grpc_metrics_streamer), time_source_(time_source),
report_counters_as_deltas_(report_counters_as_deltas) {}

void MetricsServiceSink::flushCounter(const Stats::Counter& counter) {
void MetricsServiceSink::flushCounter(
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot) {
io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics();
metrics_family->set_type(io::prometheus::client::MetricType::COUNTER);
metrics_family->set_name(counter.name());
metrics_family->set_name(counter_snapshot.counter_.get().name());
auto* metric = metrics_family->add_metric();
metric->set_timestamp_ms(std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.systemTime().time_since_epoch())
.count());
auto* counter_metric = metric->mutable_counter();
counter_metric->set_value(counter.value());
if (report_counters_as_deltas_) {
counter_metric->set_value(counter_snapshot.delta_);
} else {
counter_metric->set_value(counter_snapshot.counter_.get().value());
}
}

void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge) {
Expand Down Expand Up @@ -110,7 +117,7 @@ void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) {
snapshot.histograms().size());
for (const auto& counter : snapshot.counters()) {
if (counter.counter_.get().used()) {
flushCounter(counter.counter_.get());
flushCounter(counter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,19 @@ class MetricsServiceSink : public Stats::Sink {
public:
// MetricsService::Sink
MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
TimeSource& time_system);
TimeSource& time_system, const bool report_counters_as_deltas);
void flush(Stats::MetricSnapshot& snapshot) override;
void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}

void flushCounter(const Stats::Counter& counter);
void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot);
void flushGauge(const Stats::Gauge& gauge);
void flushHistogram(const Stats::ParentHistogram& envoy_histogram);

private:
GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_;
envoy::service::metrics::v3::StreamMetricsMessage message_;
TimeSource& time_source_;
const bool report_counters_as_deltas_;
};

} // namespace MetricsService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,71 +90,103 @@ class MockGrpcMetricsStreamer : public GrpcMetricsStreamer {
MOCK_METHOD(void, send, (envoy::service::metrics::v3::StreamMetricsMessage & message));
};

class TestGrpcMetricsStreamer : public GrpcMetricsStreamer {
class MetricsServiceSinkTest : public testing::Test {
public:
int metric_count;
// GrpcMetricsStreamer
void send(envoy::service::metrics::v3::StreamMetricsMessage& message) override {
metric_count = message.envoy_metrics_size();
}
};
MetricsServiceSinkTest() = default;

class MetricsServiceSinkTest : public testing::Test {};

TEST(MetricsServiceSinkTest, CheckSendCall) {
NiceMock<Stats::MockMetricSnapshot> snapshot;
Event::SimulatedTimeSystem time_system;
NiceMock<Stats::MockMetricSnapshot> snapshot_;
Event::SimulatedTimeSystem time_system_;
std::shared_ptr<MockGrpcMetricsStreamer> streamer_{new MockGrpcMetricsStreamer()};
};

MetricsServiceSink sink(streamer_, time_system);
TEST_F(MetricsServiceSinkTest, CheckSendCall) {
MetricsServiceSink sink(streamer_, time_system_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->latch_ = 1;
counter->used_ = true;
snapshot.counters_.push_back({1, *counter});
snapshot_.counters_.push_back({1, *counter});

auto gauge = std::make_shared<NiceMock<Stats::MockGauge>>();
gauge->name_ = "test_gauge";
gauge->value_ = 1;
gauge->used_ = true;
snapshot.gauges_.push_back(*gauge);
snapshot_.gauges_.push_back(*gauge);

auto histogram = std::make_shared<NiceMock<Stats::MockParentHistogram>>();
histogram->name_ = "test_histogram";
histogram->used_ = true;

EXPECT_CALL(*streamer_, send(_));

sink.flush(snapshot);
sink.flush(snapshot_);
}

TEST(MetricsServiceSinkTest, CheckStatsCount) {
NiceMock<Stats::MockMetricSnapshot> snapshot;
Event::SimulatedTimeSystem time_system;
std::shared_ptr<TestGrpcMetricsStreamer> streamer_{new TestGrpcMetricsStreamer()};

MetricsServiceSink sink(streamer_, time_system);
TEST_F(MetricsServiceSinkTest, CheckStatsCount) {
MetricsServiceSink sink(streamer_, time_system_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->latch_ = 1;
counter->value_ = 100;
counter->used_ = true;
snapshot.counters_.push_back({1, *counter});
snapshot_.counters_.push_back({1, *counter});

auto gauge = std::make_shared<NiceMock<Stats::MockGauge>>();
gauge->name_ = "test_gauge";
gauge->value_ = 1;
gauge->used_ = true;
snapshot.gauges_.push_back(*gauge);
snapshot_.gauges_.push_back(*gauge);

sink.flush(snapshot);
EXPECT_EQ(2, (*streamer_).metric_count);
EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(2, message.envoy_metrics_size());
}));
sink.flush(snapshot_);

// Verify only newly added metrics come after endFlush call.
gauge->used_ = false;
sink.flush(snapshot);
EXPECT_EQ(1, (*streamer_).metric_count);
EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(1, message.envoy_metrics_size());
}));
sink.flush(snapshot_);
}

// Test that verifies counters are correctly reported as current value when configured to do so.
TEST_F(MetricsServiceSinkTest, ReportCountersValues) {
MetricsServiceSink sink(streamer_, time_system_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->value_ = 100;
counter->used_ = true;
snapshot_.counters_.push_back({1, *counter});

EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(1, message.envoy_metrics_size());
EXPECT_EQ(100, message.envoy_metrics(0).metric(0).counter().value());
}));
sink.flush(snapshot_);
}

// Test that verifies counters are reported as the delta between flushes when configured to do so.
TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) {
MetricsServiceSink sink(streamer_, time_system_, true);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->value_ = 100;
counter->used_ = true;
snapshot_.counters_.push_back({1, *counter});

EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(1, message.envoy_metrics_size());
EXPECT_EQ(1, message.envoy_metrics(0).metric(0).counter().value());
}));
sink.flush(snapshot_);
}

} // namespace
Expand Down

0 comments on commit 048f423

Please sign in to comment.