diff --git a/lib/datadog/core/telemetry/metrics_collection.rb b/lib/datadog/core/telemetry/metrics_collection.rb index 3d27b5f8a2a..d1997a08a2d 100644 --- a/lib/datadog/core/telemetry/metrics_collection.rb +++ b/lib/datadog/core/telemetry/metrics_collection.rb @@ -45,15 +45,17 @@ def distribution(metric_name, value, tags: {}, common: true) fetch_or_add_distribution(metric, value) end - def flush!(queue) + def flush! @mutex.synchronize do - queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any? - queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any? + events = [] + events << Event::GenerateMetrics.new(@namespace, @metrics.values) if @metrics.any? + events << Event::Distributions.new(@namespace, @distributions.values) if @distributions.any? @metrics = {} @distributions = {} + + events end - nil end private diff --git a/lib/datadog/core/telemetry/metrics_manager.rb b/lib/datadog/core/telemetry/metrics_manager.rb index e4a3055771a..95750728c64 100644 --- a/lib/datadog/core/telemetry/metrics_manager.rb +++ b/lib/datadog/core/telemetry/metrics_manager.rb @@ -57,13 +57,11 @@ def distribution(namespace, metric_name, value, tags: {}, common: true) collection.distribution(metric_name, value, tags: tags, common: common) end - def flush!(queue) - return unless @enabled + def flush! + return [] unless @enabled collections = @mutex.synchronize { @collections.values } - collections.each { |col| col.flush!(queue) } - - nil + collections.reduce([]) { |events, collection| events + collection.flush! } end def disable! diff --git a/sig/datadog/core/telemetry/metrics_collection.rbs b/sig/datadog/core/telemetry/metrics_collection.rbs index 1edcf24a121..7fcdd814ec1 100644 --- a/sig/datadog/core/telemetry/metrics_collection.rbs +++ b/sig/datadog/core/telemetry/metrics_collection.rbs @@ -2,10 +2,6 @@ module Datadog module Core module Telemetry class MetricsCollection - interface _Queue - def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void - end - @namespace: String @interval: Float @@ -30,7 +26,7 @@ module Datadog def distribution: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void - def flush!: (_Queue queue) -> void + def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base] private diff --git a/sig/datadog/core/telemetry/metrics_manager.rbs b/sig/datadog/core/telemetry/metrics_manager.rbs index 7083e155edb..50369469732 100644 --- a/sig/datadog/core/telemetry/metrics_manager.rbs +++ b/sig/datadog/core/telemetry/metrics_manager.rbs @@ -2,10 +2,6 @@ module Datadog module Core module Telemetry class MetricsManager - interface _Queue - def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void - end - @interval: Float @enabled: bool @@ -28,7 +24,7 @@ module Datadog def distribution: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void - def flush!: (_Queue queue) -> void + def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base] def disable!: () -> void diff --git a/spec/datadog/core/telemetry/metrics_collection_spec.rb b/spec/datadog/core/telemetry/metrics_collection_spec.rb index a8fd6fe1ebd..58844bdaf45 100644 --- a/spec/datadog/core/telemetry/metrics_collection_spec.rb +++ b/spec/datadog/core/telemetry/metrics_collection_spec.rb @@ -241,28 +241,22 @@ def first_distribution_values end describe '#flush!' do - let(:queue) { double('queue') } - - before do - allow(queue).to receive(:enqueue) - end - it 'flushes metrics' do collection.inc('metric_name', 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true) collection.inc('metric_name', 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) - expect(queue).to receive(:enqueue) do |event| - expect(event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics) - payload = event.payload + events = collection.flush! + expect(events).to have(1).item - expect(payload[:namespace]).to eq(namespace) - expect(payload[:series]).to have(2).items + event = events.first + expect(event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics) + payload = event.payload - tags = payload[:series].map { |s| s[:tags] }.sort - expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) - end.once + expect(payload[:namespace]).to eq(namespace) + expect(payload[:series]).to have(2).items - collection.flush!(queue) + tags = payload[:series].map { |s| s[:tags] }.sort + expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) expect(metrics.size).to eq(0) end @@ -273,21 +267,21 @@ def first_distribution_values collection.distribution('metric_name', 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) collection.distribution('metric_name', 7, tags: { tag1: 'val1', tag2: 'val3' }, common: true) - expect(queue).to receive(:enqueue) do |event| - expect(event).to be_a(Datadog::Core::Telemetry::Event::Distributions) - payload = event.payload + events = collection.flush! + expect(events).to have(1).item - expect(payload[:namespace]).to eq(namespace) - expect(payload[:series]).to have(2).items + event = events.first + expect(event).to be_a(Datadog::Core::Telemetry::Event::Distributions) + payload = event.payload - tags = payload[:series].map { |s| s[:tags] }.sort - expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) + expect(payload[:namespace]).to eq(namespace) + expect(payload[:series]).to have(2).items - values = payload[:series].map { |s| s[:points] }.sort - expect(values).to eq([[5, 6], [5, 7]]) - end.once + tags = payload[:series].map { |s| s[:tags] }.sort + expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) - collection.flush!(queue) + values = payload[:series].map { |s| s[:points] }.sort + expect(values).to eq([[5, 6], [5, 7]]) expect(distributions.size).to eq(0) end @@ -297,18 +291,23 @@ def first_distribution_values threads_count = 5 metrics_count = 0 - expect(queue).to receive(:enqueue) do |event| - mutex.synchronize { metrics_count += event.payload[:series].size } - end.at_least(:once) - threads = Array.new(threads_count) do |i| Thread.new do collection.inc("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true) - collection.flush!(queue) + + events = collection.flush! + collection.inc("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) collection.distribution("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true) collection.distribution("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) - collection.flush!(queue) + + events += collection.flush! + + mutex.synchronize do + events.each do |event| + metrics_count += event.payload[:series].size + end + end end end diff --git a/spec/datadog/core/telemetry/metrics_manager_spec.rb b/spec/datadog/core/telemetry/metrics_manager_spec.rb index 723258640d9..3aef7cee414 100644 --- a/spec/datadog/core/telemetry/metrics_manager_spec.rb +++ b/spec/datadog/core/telemetry/metrics_manager_spec.rb @@ -188,18 +188,18 @@ def first_collection end describe '#flush!' do - subject(:flush!) { manager.flush!(queue) } - - let(:queue) { [] } + subject(:flush!) { manager.flush! } it 'forwards flush to the collections' do + events = [double(:event)] + collection = double(:collection) expect(Datadog::Core::Telemetry::MetricsCollection).to receive(:new).and_return(collection) expect(collection).to receive(:inc) - expect(collection).to receive(:flush!).with(queue) + expect(collection).to receive(:flush!).and_return(events) manager.inc(namespace, metric_name, value, tags: tags) - flush! + expect(flush!).to eq(events) end context 'when disabled' do @@ -208,13 +208,11 @@ def first_collection it 'does nothing' do expect(Datadog::Core::Telemetry::MetricsCollection).to_not receive(:new) - flush! + expect(flush!).to eq([]) end end context 'concurrently creating and flushing namespaces' do - let(:queue) { double('queue') } - it 'flushes all metrics' do mutex = Mutex.new @@ -222,16 +220,19 @@ def first_collection metrics_per_thread = 3 flushed_metrics_count = 0 - allow(queue).to receive(:enqueue) do |event| - mutex.synchronize { flushed_metrics_count += event.payload[:series].count } - end threads = Array.new(threads_count) do |n| Thread.new do metrics_per_thread.times do |i| manager.inc("namespace #{n}", "#{metric_name} #{i}", value, tags: tags) end - manager.flush!(queue) + events = manager.flush! + + mutex.synchronize do + events.each do |event| + flushed_metrics_count += event.payload[:series].count + end + end end end