Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics service sink: add config option to report counters as deltas #10889

Merged
merged 9 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/envoy/config/metrics/v3/metrics_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package envoy.config.metrics.v3;

import "envoy/config/core/v3/grpc_service.proto";

import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
import "validate/validate.proto";
Expand All @@ -25,4 +27,8 @@ message MetricsServiceConfig {

// The upstream gRPC cluster that hosts the metrics service.
core.v3.GrpcService grpc_service = 1 [(validate.rules).message = {required: true}];

// If true, counters are reported as the delta between flushing intervals. Otherwise, the current
// counter value is reported. Defaults to false.
junr03 marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.BoolValue report_counters_as_deltas = 2;
}
3 changes: 2 additions & 1 deletion docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Changes
* access loggers: added GRPC_STATUS operator on logging format.
* access loggers: extened specifier for FilterStateFormatter to output :ref:`unstructured log string <config_access_log_format_filter_state>`.
* dynamic forward proxy: added :ref:`SNI based dynamic forward proxy <config_network_filters_sni_dynamic_forward_proxy>` support.
* fault: added support for controlling the percentage of requests that abort, delay and response rate limits faults
* fault: added support for controlling the percentage of requests that abort, delay and response rate limits faults
are applied to using :ref:`HTTP headers <config_http_filters_fault_injection_http_header>` to the HTTP fault filter.
* filter: add `upstram_rq_time` stats to the GPRC stats filter.
Disabled by default and can be enabled via :ref:`enable_upstream_stats <envoy_v3_api_field_extensions.filters.http.grpc_stats.v3.FilterConfig.enable_upstream_stats>`.
Expand All @@ -28,6 +28,7 @@ Changes
tracing is not forced.
* router: allow retries of streaming or incomplete requests. This removes stat `rq_retry_skipped_request_not_complete`.
* router: allow retries by default when upstream responds with :ref:`x-envoy-overloaded <config_http_filters_router_x-envoy-overloaded_set>`.
* stats: added the option to :ref:`report counters as deltas <envoy_v3_api_field_config.metrics.v3.MetricsServiceConfig.report_counters_as_deltas>` to the metrics service stats sink.
* tracing: tracing configuration has been made fully dynamic and every HTTP connection manager
can now have a separate :ref:`tracing provider <envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.Tracing.provider>`.
* upstream: fixed a bug where Envoy would panic when receiving a GRPC SERVICE_UNKNOWN status on the health check.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion source/extensions/stat_sinks/metrics_service/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ Stats::SinkPtr MetricsServiceSinkFactory::createStatsSink(const Protobuf::Messag
grpc_service, server.stats(), false),
server.localInfo());

return std::make_unique<MetricsServiceSink>(grpc_metrics_streamer, server.timeSource());
return std::make_unique<MetricsServiceSink>(
grpc_metrics_streamer, server.timeSource(),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false));
}

ProtobufTypes::MessagePtr MetricsServiceSinkFactory::createEmptyConfigProto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,26 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes
}

MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
TimeSource& time_source)
: grpc_metrics_streamer_(grpc_metrics_streamer), time_source_(time_source) {}
TimeSource& time_source,
const bool report_counters_as_deltas)
: grpc_metrics_streamer_(grpc_metrics_streamer), time_source_(time_source),
report_counters_as_deltas_(report_counters_as_deltas) {}

void MetricsServiceSink::flushCounter(const Stats::Counter& counter) {
void MetricsServiceSink::flushCounter(
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot) {
io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics();
metrics_family->set_type(io::prometheus::client::MetricType::COUNTER);
metrics_family->set_name(counter.name());
metrics_family->set_name(counter_snapshot.counter_.get().name());
auto* metric = metrics_family->add_metric();
metric->set_timestamp_ms(std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.systemTime().time_since_epoch())
.count());
auto* counter_metric = metric->mutable_counter();
counter_metric->set_value(counter.value());
if (report_counters_as_deltas_) {
counter_metric->set_value(counter_snapshot.delta_);
} else {
counter_metric->set_value(counter_snapshot.counter_.get().value());
}
}

void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge) {
Expand Down Expand Up @@ -110,7 +117,7 @@ void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) {
snapshot.histograms().size());
for (const auto& counter : snapshot.counters()) {
if (counter.counter_.get().used()) {
flushCounter(counter.counter_.get());
flushCounter(counter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,19 @@ class MetricsServiceSink : public Stats::Sink {
public:
// MetricsService::Sink
MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
TimeSource& time_system);
TimeSource& time_system, const bool report_counters_as_deltas);
junr03 marked this conversation as resolved.
Show resolved Hide resolved
void flush(Stats::MetricSnapshot& snapshot) override;
void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}

void flushCounter(const Stats::Counter& counter);
void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot);
void flushGauge(const Stats::Gauge& gauge);
void flushHistogram(const Stats::ParentHistogram& envoy_histogram);

private:
GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_;
envoy::service::metrics::v3::StreamMetricsMessage message_;
TimeSource& time_source_;
const bool report_counters_as_deltas_;
};

} // namespace MetricsService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,71 +90,101 @@ class MockGrpcMetricsStreamer : public GrpcMetricsStreamer {
MOCK_METHOD(void, send, (envoy::service::metrics::v3::StreamMetricsMessage & message));
};

class TestGrpcMetricsStreamer : public GrpcMetricsStreamer {
class MetricsServiceSinkTest : public testing::Test {
public:
int metric_count;
// GrpcMetricsStreamer
void send(envoy::service::metrics::v3::StreamMetricsMessage& message) override {
metric_count = message.envoy_metrics_size();
}
};
MetricsServiceSinkTest() = default;

class MetricsServiceSinkTest : public testing::Test {};

TEST(MetricsServiceSinkTest, CheckSendCall) {
NiceMock<Stats::MockMetricSnapshot> snapshot;
Event::SimulatedTimeSystem time_system;
NiceMock<Stats::MockMetricSnapshot> snapshot_;
Event::SimulatedTimeSystem time_system_;
std::shared_ptr<MockGrpcMetricsStreamer> streamer_{new MockGrpcMetricsStreamer()};
};

MetricsServiceSink sink(streamer_, time_system);
TEST_F(MetricsServiceSinkTest, CheckSendCall) {
MetricsServiceSink sink(streamer_, time_system_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->latch_ = 1;
counter->used_ = true;
snapshot.counters_.push_back({1, *counter});
snapshot_.counters_.push_back({1, *counter});

auto gauge = std::make_shared<NiceMock<Stats::MockGauge>>();
gauge->name_ = "test_gauge";
gauge->value_ = 1;
gauge->used_ = true;
snapshot.gauges_.push_back(*gauge);
snapshot_.gauges_.push_back(*gauge);

auto histogram = std::make_shared<NiceMock<Stats::MockParentHistogram>>();
histogram->name_ = "test_histogram";
histogram->used_ = true;

EXPECT_CALL(*streamer_, send(_));

sink.flush(snapshot);
sink.flush(snapshot_);
}

TEST(MetricsServiceSinkTest, CheckStatsCount) {
NiceMock<Stats::MockMetricSnapshot> snapshot;
Event::SimulatedTimeSystem time_system;
std::shared_ptr<TestGrpcMetricsStreamer> streamer_{new TestGrpcMetricsStreamer()};

MetricsServiceSink sink(streamer_, time_system);
TEST_F(MetricsServiceSinkTest, CheckStatsCount) {
MetricsServiceSink sink(streamer_, time_system_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->latch_ = 1;
counter->value_ = 100;
counter->used_ = true;
snapshot.counters_.push_back({1, *counter});
snapshot_.counters_.push_back({1, *counter});

auto gauge = std::make_shared<NiceMock<Stats::MockGauge>>();
gauge->name_ = "test_gauge";
gauge->value_ = 1;
gauge->used_ = true;
snapshot.gauges_.push_back(*gauge);
snapshot_.gauges_.push_back(*gauge);

sink.flush(snapshot);
EXPECT_EQ(2, (*streamer_).metric_count);
EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(2, message.envoy_metrics_size());
}));
sink.flush(snapshot_);

// Verify only newly added metrics come after endFlush call.
gauge->used_ = false;
sink.flush(snapshot);
EXPECT_EQ(1, (*streamer_).metric_count);
EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(1, message.envoy_metrics_size());
}));
sink.flush(snapshot_);
}

TEST_F(MetricsServiceSinkTest, ReportCountersValues) {
junr03 marked this conversation as resolved.
Show resolved Hide resolved
MetricsServiceSink sink(streamer_, time_system_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->value_ = 100;
counter->used_ = true;
snapshot_.counters_.push_back({1, *counter});

EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(1, message.envoy_metrics_size());
EXPECT_EQ(100, message.envoy_metrics(0).metric(0).counter().value());
}));
sink.flush(snapshot_);
}

TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) {
MetricsServiceSink sink(streamer_, time_system_, true);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
counter->value_ = 100;
counter->used_ = true;
snapshot_.counters_.push_back({1, *counter});

EXPECT_CALL(*streamer_, send(_))
.WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) {
EXPECT_EQ(1, message.envoy_metrics_size());
EXPECT_EQ(1, message.envoy_metrics(0).metric(0).counter().value());
}));
sink.flush(snapshot_);
}

} // namespace
Expand Down