Skip to content

Commit

Permalink
to flush and send metrics in single pass of Telemetry::Worker#perform…
Browse files Browse the repository at this point in the history
… method we need to return events directly instead of enqueuing them to the worker
  • Loading branch information
anmarchenko committed Jul 9, 2024
1 parent 593f9aa commit 058a14f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 63 deletions.
10 changes: 6 additions & 4 deletions lib/datadog/core/telemetry/metrics_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ def distribution(metric_name, value, tags: {}, common: true)
fetch_or_add_distribution(metric, value)
end

def flush!(queue)
def flush!
@mutex.synchronize do
queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any?
queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any?
events = []
events << Event::GenerateMetrics.new(@namespace, @metrics.values) if @metrics.any?
events << Event::Distributions.new(@namespace, @distributions.values) if @distributions.any?

@metrics = {}
@distributions = {}

events
end
nil
end

private
Expand Down
8 changes: 3 additions & 5 deletions lib/datadog/core/telemetry/metrics_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ def distribution(namespace, metric_name, value, tags: {}, common: true)
collection.distribution(metric_name, value, tags: tags, common: common)
end

def flush!(queue)
return unless @enabled
def flush!
return [] unless @enabled

collections = @mutex.synchronize { @collections.values }
collections.each { |col| col.flush!(queue) }

nil
collections.reduce([]) { |events, collection| events + collection.flush! }
end

def disable!
Expand Down
6 changes: 1 addition & 5 deletions sig/datadog/core/telemetry/metrics_collection.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ module Datadog
module Core
module Telemetry
class MetricsCollection
interface _Queue
def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void
end

@namespace: String

@interval: Float
Expand All @@ -30,7 +26,7 @@ module Datadog

def distribution: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

def flush!: (_Queue queue) -> void
def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base]

private

Expand Down
6 changes: 1 addition & 5 deletions sig/datadog/core/telemetry/metrics_manager.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ module Datadog
module Core
module Telemetry
class MetricsManager
interface _Queue
def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void
end

@interval: Float

@enabled: bool
Expand All @@ -28,7 +24,7 @@ module Datadog

def distribution: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

def flush!: (_Queue queue) -> void
def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base]

def disable!: () -> void

Expand Down
63 changes: 31 additions & 32 deletions spec/datadog/core/telemetry/metrics_collection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -241,28 +241,22 @@ def first_distribution_values
end

describe '#flush!' do
let(:queue) { double('queue') }

before do
allow(queue).to receive(:enqueue)
end

it 'flushes metrics' do
collection.inc('metric_name', 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true)
collection.inc('metric_name', 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true)

expect(queue).to receive(:enqueue) do |event|
expect(event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics)
payload = event.payload
events = collection.flush!
expect(events).to have(1).item

expect(payload[:namespace]).to eq(namespace)
expect(payload[:series]).to have(2).items
event = events.first
expect(event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics)
payload = event.payload

tags = payload[:series].map { |s| s[:tags] }.sort
expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']])
end.once
expect(payload[:namespace]).to eq(namespace)
expect(payload[:series]).to have(2).items

collection.flush!(queue)
tags = payload[:series].map { |s| s[:tags] }.sort
expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']])

expect(metrics.size).to eq(0)
end
Expand All @@ -273,21 +267,21 @@ def first_distribution_values
collection.distribution('metric_name', 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true)
collection.distribution('metric_name', 7, tags: { tag1: 'val1', tag2: 'val3' }, common: true)

expect(queue).to receive(:enqueue) do |event|
expect(event).to be_a(Datadog::Core::Telemetry::Event::Distributions)
payload = event.payload
events = collection.flush!
expect(events).to have(1).item

expect(payload[:namespace]).to eq(namespace)
expect(payload[:series]).to have(2).items
event = events.first
expect(event).to be_a(Datadog::Core::Telemetry::Event::Distributions)
payload = event.payload

tags = payload[:series].map { |s| s[:tags] }.sort
expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']])
expect(payload[:namespace]).to eq(namespace)
expect(payload[:series]).to have(2).items

values = payload[:series].map { |s| s[:points] }.sort
expect(values).to eq([[5, 6], [5, 7]])
end.once
tags = payload[:series].map { |s| s[:tags] }.sort
expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']])

collection.flush!(queue)
values = payload[:series].map { |s| s[:points] }.sort
expect(values).to eq([[5, 6], [5, 7]])

expect(distributions.size).to eq(0)
end
Expand All @@ -297,18 +291,23 @@ def first_distribution_values
threads_count = 5
metrics_count = 0

expect(queue).to receive(:enqueue) do |event|
mutex.synchronize { metrics_count += event.payload[:series].size }
end.at_least(:once)

threads = Array.new(threads_count) do |i|
Thread.new do
collection.inc("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true)
collection.flush!(queue)

events = collection.flush!

collection.inc("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true)
collection.distribution("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true)
collection.distribution("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true)
collection.flush!(queue)

events += collection.flush!

mutex.synchronize do
events.each do |event|
metrics_count += event.payload[:series].size
end
end
end
end

Expand Down
25 changes: 13 additions & 12 deletions spec/datadog/core/telemetry/metrics_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,18 @@ def first_collection
end

describe '#flush!' do
subject(:flush!) { manager.flush!(queue) }

let(:queue) { [] }
subject(:flush!) { manager.flush! }

it 'forwards flush to the collections' do
events = [double(:event)]

collection = double(:collection)
expect(Datadog::Core::Telemetry::MetricsCollection).to receive(:new).and_return(collection)
expect(collection).to receive(:inc)
expect(collection).to receive(:flush!).with(queue)
expect(collection).to receive(:flush!).and_return(events)

manager.inc(namespace, metric_name, value, tags: tags)
flush!
expect(flush!).to eq(events)
end

context 'when disabled' do
Expand All @@ -208,30 +208,31 @@ def first_collection
it 'does nothing' do
expect(Datadog::Core::Telemetry::MetricsCollection).to_not receive(:new)

flush!
expect(flush!).to eq([])
end
end

context 'concurrently creating and flushing namespaces' do
let(:queue) { double('queue') }

it 'flushes all metrics' do
mutex = Mutex.new

threads_count = 5
metrics_per_thread = 3

flushed_metrics_count = 0
allow(queue).to receive(:enqueue) do |event|
mutex.synchronize { flushed_metrics_count += event.payload[:series].count }
end

threads = Array.new(threads_count) do |n|
Thread.new do
metrics_per_thread.times do |i|
manager.inc("namespace #{n}", "#{metric_name} #{i}", value, tags: tags)
end
manager.flush!(queue)
events = manager.flush!

mutex.synchronize do
events.each do |event|
flushed_metrics_count += event.payload[:series].count
end
end
end
end

Expand Down

0 comments on commit 058a14f

Please sign in to comment.