Skip to content

Commit

Permalink
Statsd UDP Buffer (envoyproxy#11724)
Browse files Browse the repository at this point in the history
Optional buffer on statsd udp

Signed-off-by: John Murray <murray@stripe.com>
Signed-off-by: scheler <santosh.cheler@appdynamics.com>
  • Loading branch information
murray-stripe authored and scheler committed Aug 4, 2020
1 parent e0ce986 commit 39e5c5f
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 25 deletions.
8 changes: 8 additions & 0 deletions api/envoy/config/metrics/v3/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,14 @@ message DogStatsdSink {
// Optional custom metric name prefix. See :ref:`StatsdSink's prefix field
// <envoy_api_field_config.metrics.v3.StatsdSink.prefix>` for more details.
string prefix = 3;

// Optional max datagram size to use when sending UDP messages. By default Envoy
// will emit one metric per datagram. By specifying a max-size larger than a single
// metric, Envoy will emit multiple, new-line separated metrics. The max datagram
// size should not exceed your network's MTU.
//
// Note that this value may not be respected if smaller than a single metric.
google.protobuf.UInt64Value max_bytes_per_datagram = 4 [(validate.rules).uint64 = {gt: 0}];
}

// Stats configuration proto schema for built-in *envoy.stat_sinks.hystrix* sink.
Expand Down
8 changes: 8 additions & 0 deletions api/envoy/config/metrics/v4alpha/stats.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions generated_api_shadow/envoy/config/metrics/v3/stats.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions generated_api_shadow/envoy/config/metrics/v4alpha/stats.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 48 additions & 6 deletions source/extensions/stat_sinks/common/statsd/statsd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <cstdint>
#include <string>

#include "envoy/buffer/buffer.h"
#include "envoy/common/exception.h"
#include "envoy/common/platform.h"
#include "envoy/event/dispatcher.h"
#include "envoy/stats/scope.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/api/os_sys_calls_impl.h"
#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"
#include "common/common/fmt.h"
#include "common/common/utility.h"
Expand Down Expand Up @@ -38,34 +40,74 @@ void UdpStatsdSink::WriterImpl::write(const std::string& message) {
Network::Utility::writeToSocket(*io_handle_, &slice, 1, nullptr, *parent_.server_address_);
}

void UdpStatsdSink::WriterImpl::writeBuffer(Buffer::Instance& data) {
Network::Utility::writeToSocket(*io_handle_, data, nullptr, *parent_.server_address_);
}

UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls,
Network::Address::InstanceConstSharedPtr address, const bool use_tag,
const std::string& prefix)
const std::string& prefix, absl::optional<uint64_t> buffer_size)
: tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag),
prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix) {
prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix),
buffer_size_(buffer_size.value_or(0)) {
tls_->set([this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<WriterImpl>(*this);
});
}

void UdpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
Writer& writer = tls_->getTyped<Writer>();
Buffer::OwnedImpl buffer;

for (const auto& counter : snapshot.counters()) {
if (counter.counter_.get().used()) {
writer.write(absl::StrCat(prefix_, ".", getName(counter.counter_.get()), ":", counter.delta_,
"|c", buildTagStr(counter.counter_.get().tags())));
const std::string counter_str =
absl::StrCat(prefix_, ".", getName(counter.counter_.get()), ":", counter.delta_, "|c",
buildTagStr(counter.counter_.get().tags()));
writeBuffer(buffer, writer, counter_str);
}
}

for (const auto& gauge : snapshot.gauges()) {
if (gauge.get().used()) {
writer.write(absl::StrCat(prefix_, ".", getName(gauge.get()), ":", gauge.get().value(), "|g",
buildTagStr(gauge.get().tags())));
const std::string gauge_str =
absl::StrCat(prefix_, ".", getName(gauge.get()), ":", gauge.get().value(), "|g",
buildTagStr(gauge.get().tags()));
writeBuffer(buffer, writer, gauge_str);
}
}

flushBuffer(buffer, writer);
// TODO(efimki): Add support of text readouts stats.
}

void UdpStatsdSink::writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer,
const std::string& statsd_metric) const {
if (statsd_metric.length() >= buffer_size_) {
// Our statsd_metric is too large to fit into the buffer, skip buffering and write directly
writer.write(statsd_metric);
} else {
if ((buffer.length() + statsd_metric.length() + 1) > buffer_size_) {
// If we add the new statsd_metric, we'll overflow our buffer. Flush the buffer to make
// room for the new statsd_metric.
flushBuffer(buffer, writer);
} else if (buffer.length() > 0) {
// We have room and have metrics already in the buffer, add a newline to separate
// metric entries.
buffer.add("\n");
}
buffer.add(statsd_metric);
}
}

void UdpStatsdSink::flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const {
if (buffer.length() == 0) {
return;
}
writer.writeBuffer(buffer);
buffer.drain(buffer.length());
}

void UdpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
// For statsd histograms are all timers in milliseconds, Envoy histograms are however
// not necessarily timers in milliseconds, for Envoy histograms suffixed with their corresponding
Expand Down
19 changes: 16 additions & 3 deletions source/extensions/stat_sinks/common/statsd/statsd.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "envoy/buffer/buffer.h"
#include "envoy/common/platform.h"
#include "envoy/local_info/local_info.h"
#include "envoy/network/connection.h"
Expand All @@ -15,6 +16,8 @@
#include "common/common/macros.h"
#include "common/network/io_socket_handle_impl.h"

#include "absl/types/optional.h"

namespace Envoy {
namespace Extensions {
namespace StatSinks {
Expand All @@ -34,15 +37,19 @@ class UdpStatsdSink : public Stats::Sink {
class Writer : public ThreadLocal::ThreadLocalObject {
public:
virtual void write(const std::string& message) PURE;
virtual void writeBuffer(Buffer::Instance& data) PURE;
};

UdpStatsdSink(ThreadLocal::SlotAllocator& tls, Network::Address::InstanceConstSharedPtr address,
const bool use_tag, const std::string& prefix = getDefaultPrefix());
const bool use_tag, const std::string& prefix = getDefaultPrefix(),
absl::optional<uint64_t> buffer_size = absl::nullopt);
// For testing.
UdpStatsdSink(ThreadLocal::SlotAllocator& tls, const std::shared_ptr<Writer>& writer,
const bool use_tag, const std::string& prefix = getDefaultPrefix())
const bool use_tag, const std::string& prefix = getDefaultPrefix(),
absl::optional<uint64_t> buffer_size = absl::nullopt)
: tls_(tls.allocateSlot()), use_tag_(use_tag),
prefix_(prefix.empty() ? getDefaultPrefix() : prefix) {
prefix_(prefix.empty() ? getDefaultPrefix() : prefix),
buffer_size_(buffer_size.value_or(0)) {
tls_->set(
[writer](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return writer; });
}
Expand All @@ -52,6 +59,7 @@ class UdpStatsdSink : public Stats::Sink {
void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override;

bool getUseTagForTest() { return use_tag_; }
uint64_t getBufferSizeForTest() { return buffer_size_; }
const std::string& getPrefix() { return prefix_; }

private:
Expand All @@ -64,12 +72,16 @@ class UdpStatsdSink : public Stats::Sink {

// Writer
void write(const std::string& message) override;
void writeBuffer(Buffer::Instance& data) override;

private:
UdpStatsdSink& parent_;
const Network::IoHandlePtr io_handle_;
};

void flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const;
void writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer, const std::string& data) const;

const std::string getName(const Stats::Metric& metric) const;
const std::string buildTagStr(const std::vector<Stats::Tag>& tags) const;

Expand All @@ -78,6 +90,7 @@ class UdpStatsdSink : public Stats::Sink {
const bool use_tag_;
// Prefix for all flushed stats.
const std::string prefix_;
const uint64_t buffer_size_;
};

/**
Expand Down
8 changes: 7 additions & 1 deletion source/extensions/stat_sinks/dog_statsd/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "extensions/stat_sinks/common/statsd/statsd.h"
#include "extensions/stat_sinks/well_known_names.h"

#include "absl/types/optional.h"

namespace Envoy {
namespace Extensions {
namespace StatSinks {
Expand All @@ -24,8 +26,12 @@ Stats::SinkPtr DogStatsdSinkFactory::createStatsSink(const Protobuf::Message& co
Network::Address::InstanceConstSharedPtr address =
Network::Address::resolveProtoAddress(sink_config.address());
ENVOY_LOG(debug, "dog_statsd UDP ip address: {}", address->asString());
absl::optional<uint64_t> max_bytes;
if (sink_config.has_max_bytes_per_datagram()) {
max_bytes = sink_config.max_bytes_per_datagram().value();
}
return std::make_unique<Common::Statsd::UdpStatsdSink>(server.threadLocal(), std::move(address),
true, sink_config.prefix());
true, sink_config.prefix(), max_bytes);
}

ProtobufTypes::MessagePtr DogStatsdSinkFactory::createEmptyConfigProto() {
Expand Down
Loading

0 comments on commit 39e5c5f

Please sign in to comment.