diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index 665485c6f0c..7aea57f5ee1 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -64,7 +64,9 @@ def build_telemetry(settings, agent_settings, logger) Telemetry::Component.new( enabled: enabled, + metrics_enabled: enabled && settings.telemetry.metrics_enabled, heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: settings.telemetry.metrics_aggregation_interval_seconds, dependency_collection: settings.telemetry.dependency_collection ) end diff --git a/lib/datadog/core/configuration/settings.rb b/lib/datadog/core/configuration/settings.rb index 71e43d89f8d..a241b73cc7f 100644 --- a/lib/datadog/core/configuration/settings.rb +++ b/lib/datadog/core/configuration/settings.rb @@ -663,6 +663,16 @@ def initialize(*_) o.type :bool end + # Enable metrics collection for telemetry. Metrics collection only works when telemetry is enabled and + # metrics are enabled. + # @default `DD_TELEMETRY_METRICS_ENABLED` environment variable, otherwise `true`. + # @return [Boolean] + option :metrics_enabled do |o| + o.type :bool + o.env Core::Telemetry::Ext::ENV_METRICS_ENABLED + o.default true + end + # The interval in seconds when telemetry must be sent. # # This method is used internally, for testing purposes only. @@ -676,6 +686,19 @@ def initialize(*_) o.default 60.0 end + # The interval in seconds when telemetry metrics are aggregated. + # Should be a denominator of `heartbeat_interval_seconds`. + # + # This method is used internally, for testing purposes only. + # @default `DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL` environment variable, otherwise `10`. + # @return [Float] + # @!visibility private + option :metrics_aggregation_interval_seconds do |o| + o.type :float + o.env Core::Telemetry::Ext::ENV_METRICS_AGGREGATION_INTERVAL + o.default 10.0 + end + # The install id of the application. # # This method is used internally, by library injection. diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index 0d5046e4391..61d72e15db6 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -2,6 +2,7 @@ require_relative 'emitter' require_relative 'event' +require_relative 'metrics_manager' require_relative 'worker' require_relative '../utils/forking' @@ -15,16 +16,31 @@ class Component include Core::Utils::Forking # @param enabled [Boolean] Determines whether telemetry events should be sent to the API + # @param metrics_enabled [Boolean] Determines whether telemetry metrics should be sent to the API # @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds. + # @param metrics_aggregation_interval_seconds [Float] How frequently metrics will be aggregated, in seconds. # @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event - def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) + def initialize( + heartbeat_interval_seconds:, + metrics_aggregation_interval_seconds:, + dependency_collection:, + enabled: true, + metrics_enabled: true + ) @enabled = enabled @stopped = false + @metrics_manager = MetricsManager.new( + enabled: enabled && metrics_enabled, + aggregation_interval: metrics_aggregation_interval_seconds + ) + @worker = Telemetry::Worker.new( enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: Emitter.new, + metrics_manager: @metrics_manager, dependency_collection: dependency_collection ) @worker.start @@ -60,6 +76,31 @@ def client_configuration_change!(changes) @worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config')) end + + # Increments a count metric. + def inc(namespace, metric_name, value, tags: {}, common: true) + @metrics_manager.inc(namespace, metric_name, value, tags: tags, common: common) + end + + # Decremenets a count metric. + def dec(namespace, metric_name, value, tags: {}, common: true) + @metrics_manager.dec(namespace, metric_name, value, tags: tags, common: common) + end + + # Tracks gauge metric. + def gauge(namespace, metric_name, value, tags: {}, common: true) + @metrics_manager.gauge(namespace, metric_name, value, tags: tags, common: common) + end + + # Tracks rate metric. + def rate(namespace, metric_name, value, tags: {}, common: true) + @metrics_manager.rate(namespace, metric_name, value, tags: tags, common: common) + end + + # Tracks distribution metric. + def distribution(namespace, metric_name, value, tags: {}, common: true) + @metrics_manager.distribution(namespace, metric_name, value, tags: tags, common: common) + end end end end diff --git a/lib/datadog/core/telemetry/ext.rb b/lib/datadog/core/telemetry/ext.rb index a70b39a28de..2c2e15c0781 100644 --- a/lib/datadog/core/telemetry/ext.rb +++ b/lib/datadog/core/telemetry/ext.rb @@ -5,7 +5,9 @@ module Core module Telemetry module Ext ENV_ENABLED = 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' + ENV_METRICS_ENABLED = 'DD_TELEMETRY_METRICS_ENABLED' ENV_HEARTBEAT_INTERVAL = 'DD_TELEMETRY_HEARTBEAT_INTERVAL' + ENV_METRICS_AGGREGATION_INTERVAL = 'DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL' ENV_DEPENDENCY_COLLECTION = 'DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED' ENV_INSTALL_ID = 'DD_INSTRUMENTATION_INSTALL_ID' ENV_INSTALL_TYPE = 'DD_INSTRUMENTATION_INSTALL_TYPE' diff --git a/lib/datadog/core/telemetry/metric.rb b/lib/datadog/core/telemetry/metric.rb index bbbc649bd5b..a1d459c4aa3 100644 --- a/lib/datadog/core/telemetry/metric.rb +++ b/lib/datadog/core/telemetry/metric.rb @@ -10,7 +10,7 @@ class Base attr_reader :name, :tags, :values, :common # @param name [String] metric name - # @param tags [Array|Hash{String=>String}] metric tags as hash of array of "tag:val" strings + # @param tags [Array|Hash{String=>String}] metric tags as hash or array of "tag:val" strings # @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric def initialize(name, tags: {}, common: true) @name = name diff --git a/lib/datadog/core/telemetry/metrics_collection.rb b/lib/datadog/core/telemetry/metrics_collection.rb index 3d27b5f8a2a..d1997a08a2d 100644 --- a/lib/datadog/core/telemetry/metrics_collection.rb +++ b/lib/datadog/core/telemetry/metrics_collection.rb @@ -45,15 +45,17 @@ def distribution(metric_name, value, tags: {}, common: true) fetch_or_add_distribution(metric, value) end - def flush!(queue) + def flush! @mutex.synchronize do - queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any? - queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any? + events = [] + events << Event::GenerateMetrics.new(@namespace, @metrics.values) if @metrics.any? + events << Event::Distributions.new(@namespace, @distributions.values) if @distributions.any? @metrics = {} @distributions = {} + + events end - nil end private diff --git a/lib/datadog/core/telemetry/metrics_manager.rb b/lib/datadog/core/telemetry/metrics_manager.rb index e4a3055771a..95750728c64 100644 --- a/lib/datadog/core/telemetry/metrics_manager.rb +++ b/lib/datadog/core/telemetry/metrics_manager.rb @@ -57,13 +57,11 @@ def distribution(namespace, metric_name, value, tags: {}, common: true) collection.distribution(metric_name, value, tags: tags, common: common) end - def flush!(queue) - return unless @enabled + def flush! + return [] unless @enabled collections = @mutex.synchronize { @collections.values } - collections.each { |col| col.flush!(queue) } - - nil + collections.reduce([]) { |events, collection| events + collection.flush! } end def disable! diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 7706ac7ce30..5166f63a7d9 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -21,19 +21,25 @@ class Worker def initialize( heartbeat_interval_seconds:, + metrics_aggregation_interval_seconds:, emitter:, + metrics_manager:, dependency_collection:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE ) @emitter = emitter + @metrics_manager = metrics_manager @dependency_collection = dependency_collection + @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i + @current_ticks = 0 + # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings - self.loop_base_interval = heartbeat_interval_seconds + self.loop_base_interval = metrics_aggregation_interval_seconds self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP @shutdown_timeout = shutdown_timeout @@ -76,13 +82,19 @@ def perform(*events) started! unless sent_started_event? - heartbeat! + metric_events = @metrics_manager.flush! + events = [] if events.nil? + flush_events(events + metric_events) - flush_events(events) + @current_ticks += 1 + return if @current_ticks < @ticks_per_heartbeat + + @current_ticks = 0 + heartbeat! end def flush_events(events) - return if events.nil? || events.empty? + return if events.empty? return if !enabled? || !sent_started_event? Datadog.logger.debug { "Sending #{events&.count} telemetry events" } @@ -100,7 +112,7 @@ def started! if failed_to_start? Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker') - self.enabled = false + disable! return end @@ -144,11 +156,16 @@ def buffer_klass end end + def disable! + self.enabled = false + @metrics_manager.disable! + end + def disable_on_not_found!(response) return unless response.not_found? Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') - self.enabled = false + disable! end end end diff --git a/sig/datadog/core/telemetry/component.rbs b/sig/datadog/core/telemetry/component.rbs index 4d411d32578..d9d54fae227 100644 --- a/sig/datadog/core/telemetry/component.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -4,13 +4,14 @@ module Datadog class Component @enabled: bool @stopped: bool + @metrics_manager: Datadog::Core::Telemetry::MetricsManager @worker: Datadog::Core::Telemetry::Worker attr_reader enabled: bool include Core::Utils::Forking - def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void + def initialize: (heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, dependency_collection: bool, ?enabled: bool, ?metrics_enabled: bool) -> void def disable!: () -> void diff --git a/sig/datadog/core/telemetry/ext.rbs b/sig/datadog/core/telemetry/ext.rbs index f1b3326530b..96ad27c06d9 100644 --- a/sig/datadog/core/telemetry/ext.rbs +++ b/sig/datadog/core/telemetry/ext.rbs @@ -4,7 +4,9 @@ module Datadog module Ext ENV_DEPENDENCY_COLLECTION: ::String ENV_ENABLED: ::String + ENV_METRICS_ENABLED: ::String ENV_HEARTBEAT_INTERVAL: ::String + ENV_METRICS_AGGREGATION_INTERVAL: ::String ENV_INSTALL_ID: ::String ENV_INSTALL_TIME: ::String ENV_INSTALL_TYPE: ::String diff --git a/sig/datadog/core/telemetry/metric.rbs b/sig/datadog/core/telemetry/metric.rbs index e0874765d98..09d9bcef3f5 100644 --- a/sig/datadog/core/telemetry/metric.rbs +++ b/sig/datadog/core/telemetry/metric.rbs @@ -47,11 +47,11 @@ module Datadog end class IntervalMetric < Base - @interval: Integer + @interval: Float - attr_reader interval: Integer + attr_reader interval: Float - def initialize: (String name, ?tags: tags_input, ?common: bool, interval: Integer) -> void + def initialize: (String name, ?tags: tags_input, ?common: bool, interval: Float) -> void end class Count < Base diff --git a/sig/datadog/core/telemetry/metrics_collection.rbs b/sig/datadog/core/telemetry/metrics_collection.rbs index fa2cdc0cb3b..7fcdd814ec1 100644 --- a/sig/datadog/core/telemetry/metrics_collection.rbs +++ b/sig/datadog/core/telemetry/metrics_collection.rbs @@ -2,13 +2,9 @@ module Datadog module Core module Telemetry class MetricsCollection - interface _Queue - def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void - end - @namespace: String - @interval: Integer + @interval: Float @mutex: Thread::Mutex @@ -18,7 +14,7 @@ module Datadog attr_reader namespace: String - def initialize: (String namespace, aggregation_interval: Integer) -> void + def initialize: (String namespace, aggregation_interval: Float) -> void def inc: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void @@ -30,7 +26,7 @@ module Datadog def distribution: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void - def flush!: (_Queue queue) -> void + def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base] private diff --git a/sig/datadog/core/telemetry/metrics_manager.rbs b/sig/datadog/core/telemetry/metrics_manager.rbs index 53e279c86e6..50369469732 100644 --- a/sig/datadog/core/telemetry/metrics_manager.rbs +++ b/sig/datadog/core/telemetry/metrics_manager.rbs @@ -2,11 +2,7 @@ module Datadog module Core module Telemetry class MetricsManager - interface _Queue - def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void - end - - @interval: Integer + @interval: Float @enabled: bool @@ -16,7 +12,7 @@ module Datadog attr_reader enabled: bool - def initialize: (aggregation_interval: Integer, enabled: bool) -> void + def initialize: (aggregation_interval: Float, enabled: bool) -> void def inc: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void @@ -28,7 +24,7 @@ module Datadog def distribution: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void - def flush!: (_Queue queue) -> void + def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base] def disable!: () -> void diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 822b9fece95..8d98714f912 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -13,12 +13,15 @@ module Datadog DEFAULT_BUFFER_MAX_SIZE: 1000 @emitter: Emitter + @metrics_manager: MetricsManager @sent_started_event: bool @shutdown_timeout: Integer @buffer_size: Integer @dependency_collection: bool + @ticks_per_heartbeat: Integer + @current_ticks: Integer - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void + def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void def start: () -> void @@ -38,9 +41,11 @@ module Datadog def flush_events: (Array[Event::Base] events) -> void - def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + def send_event: (Event::Base event) -> Http::Adapters::Net::Response - def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void + def disable!: () -> void + + def disable_on_not_found!: (Http::Adapters::Net::Response response) -> void def buffer_klass: () -> untyped end diff --git a/spec/datadog/core/configuration/components_spec.rb b/spec/datadog/core/configuration/components_spec.rb index 79c65637f91..1ccea4861b7 100644 --- a/spec/datadog/core/configuration/components_spec.rb +++ b/spec/datadog/core/configuration/components_spec.rb @@ -225,11 +225,14 @@ context 'given settings' do let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } let(:expected_options) do - { enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + { enabled: enabled, metrics_enabled: metrics_enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection } end let(:enabled) { true } + let(:metrics_enabled) { true } let(:heartbeat_interval_seconds) { 60 } + let(:metrics_aggregation_interval_seconds) { 10 } let(:dependency_collection) { true } before do @@ -246,7 +249,8 @@ context 'and :unix agent adapter' do let(:expected_options) do - { enabled: false, heartbeat_interval_seconds: heartbeat_interval_seconds, + { enabled: false, metrics_enabled: false, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection } end let(:agent_settings) do diff --git a/spec/datadog/core/configuration/settings_spec.rb b/spec/datadog/core/configuration/settings_spec.rb index 33fe290f315..79077bb87c9 100644 --- a/spec/datadog/core/configuration/settings_spec.rb +++ b/spec/datadog/core/configuration/settings_spec.rb @@ -1476,6 +1476,39 @@ end end + describe '#metrics_enabled' do + subject(:metrics_enabled) { settings.telemetry.metrics_enabled } + let(:env_var_name) { 'DD_TELEMETRY_METRICS_ENABLED' } + + context 'when DD_TELEMETRY_METRICS_ENABLED' do + context 'is not defined' do + let(:env_var_value) { nil } + + it { is_expected.to be true } + end + + [true, false].each do |value| + context "is defined as #{value}" do + let(:env_var_value) { value.to_s } + + it { is_expected.to be value } + end + end + end + end + + describe '#metrics_enabled=' do + let(:env_var_name) { 'DD_TELEMETRY_METRICS_ENABLED' } + let(:env_var_value) { 'true' } + + it 'updates the #metrics_enabled setting' do + expect { settings.telemetry.metrics_enabled = false } + .to change { settings.telemetry.metrics_enabled } + .from(true) + .to(false) + end + end + describe '#heartbeat_interval' do subject(:heartbeat_interval_seconds) { settings.telemetry.heartbeat_interval_seconds } let(:env_var_name) { 'DD_TELEMETRY_HEARTBEAT_INTERVAL' } @@ -1505,6 +1538,35 @@ end end + describe '#metrics_aggregation_interval_seconds' do + subject(:metrics_aggregation_interval_seconds) { settings.telemetry.metrics_aggregation_interval_seconds } + let(:env_var_name) { 'DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL' } + + context 'when DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL' do + context 'is not defined' do + let(:env_var_value) { nil } + + it { is_expected.to eq 10.0 } + end + + context 'is defined' do + let(:env_var_value) { '1.1' } + + it { is_expected.to eq 1.1 } + end + end + end + + describe '#metrics_aggregation_interval_seconds=' do + let(:env_var_name) { 'DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL' } + let(:env_var_value) { '1.1' } + + it 'updates the #metrics_aggregation_interval_seconds setting' do + expect { settings.telemetry.metrics_aggregation_interval_seconds = 2.2 } + .to change { settings.telemetry.metrics_aggregation_interval_seconds }.from(1.1).to(2.2) + end + end + describe '#install_id' do subject(:install_id) { settings.telemetry.install_id } let(:env_var_name) { 'DD_INSTRUMENTATION_INSTALL_ID' } diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index ba17d37c2f4..8cc9d0c0b02 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -6,13 +6,17 @@ subject(:telemetry) do described_class.new( enabled: enabled, + metrics_enabled: metrics_enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection ) end let(:enabled) { true } + let(:metrics_enabled) { true } let(:heartbeat_interval_seconds) { 0 } + let(:metrics_aggregation_interval_seconds) { 1 } let(:dependency_collection) { true } let(:worker) { double(Datadog::Core::Telemetry::Worker) } let(:not_found) { false } @@ -20,9 +24,11 @@ before do allow(Datadog::Core::Telemetry::Worker).to receive(:new).with( heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection, enabled: enabled, - emitter: an_instance_of(Datadog::Core::Telemetry::Emitter) + emitter: an_instance_of(Datadog::Core::Telemetry::Emitter), + metrics_manager: anything ).and_return(worker) allow(worker).to receive(:start) @@ -40,6 +46,7 @@ subject(:telemetry) do described_class.new( heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection ) end @@ -204,4 +211,80 @@ end end end + + context 'metrics support' do + let(:metrics_manager) { spy(:metrics_manager) } + let(:namespace) { double('namespace') } + let(:metric_name) { double('metric_name') } + let(:value) { double('value') } + let(:tags) { double('tags') } + let(:common) { double('common') } + + before do + expect(Datadog::Core::Telemetry::MetricsManager).to receive(:new).with( + aggregation_interval: metrics_aggregation_interval_seconds, + enabled: enabled && metrics_enabled + ).and_return(metrics_manager) + end + + describe '#inc' do + subject(:inc) { telemetry.inc(namespace, metric_name, value, tags: tags, common: common) } + + it do + inc + + expect(metrics_manager).to have_received(:inc).with( + namespace, metric_name, value, tags: tags, common: common + ) + end + end + + describe '#dec' do + subject(:dec) { telemetry.dec(namespace, metric_name, value, tags: tags, common: common) } + + it do + dec + + expect(metrics_manager).to have_received(:dec).with( + namespace, metric_name, value, tags: tags, common: common + ) + end + end + + describe '#gauge' do + subject(:gauge) { telemetry.gauge(namespace, metric_name, value, tags: tags, common: common) } + + it do + gauge + + expect(metrics_manager).to have_received(:gauge).with( + namespace, metric_name, value, tags: tags, common: common + ) + end + end + + describe '#rate' do + subject(:rate) { telemetry.rate(namespace, metric_name, value, tags: tags, common: common) } + + it do + rate + + expect(metrics_manager).to have_received(:rate).with( + namespace, metric_name, value, tags: tags, common: common + ) + end + end + + describe '#distribution' do + subject(:distribution) { telemetry.distribution(namespace, metric_name, value, tags: tags, common: common) } + + it do + distribution + + expect(metrics_manager).to have_received(:distribution).with( + namespace, metric_name, value, tags: tags, common: common + ) + end + end + end end diff --git a/spec/datadog/core/telemetry/metrics_collection_spec.rb b/spec/datadog/core/telemetry/metrics_collection_spec.rb index a8fd6fe1ebd..117ea692bf1 100644 --- a/spec/datadog/core/telemetry/metrics_collection_spec.rb +++ b/spec/datadog/core/telemetry/metrics_collection_spec.rb @@ -241,28 +241,22 @@ def first_distribution_values end describe '#flush!' do - let(:queue) { double('queue') } - - before do - allow(queue).to receive(:enqueue) - end - it 'flushes metrics' do collection.inc('metric_name', 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true) collection.inc('metric_name', 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) - expect(queue).to receive(:enqueue) do |event| - expect(event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics) - payload = event.payload + events = collection.flush! + expect(events).to have(1).item - expect(payload[:namespace]).to eq(namespace) - expect(payload[:series]).to have(2).items + event = events.first + expect(event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics) + payload = event.payload - tags = payload[:series].map { |s| s[:tags] }.sort - expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) - end.once + expect(payload.fetch(:namespace)).to eq(namespace) + expect(payload.fetch(:series)).to have(2).items - collection.flush!(queue) + tags = payload[:series].map { |s| s[:tags] }.sort + expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) expect(metrics.size).to eq(0) end @@ -273,21 +267,21 @@ def first_distribution_values collection.distribution('metric_name', 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) collection.distribution('metric_name', 7, tags: { tag1: 'val1', tag2: 'val3' }, common: true) - expect(queue).to receive(:enqueue) do |event| - expect(event).to be_a(Datadog::Core::Telemetry::Event::Distributions) - payload = event.payload + events = collection.flush! + expect(events).to have(1).item - expect(payload[:namespace]).to eq(namespace) - expect(payload[:series]).to have(2).items + event = events.first + expect(event).to be_a(Datadog::Core::Telemetry::Event::Distributions) + payload = event.payload - tags = payload[:series].map { |s| s[:tags] }.sort - expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) + expect(payload.fetch(:namespace)).to eq(namespace) + expect(payload.fetch(:series)).to have(2).items - values = payload[:series].map { |s| s[:points] }.sort - expect(values).to eq([[5, 6], [5, 7]]) - end.once + tags = payload[:series].map { |s| s[:tags] }.sort + expect(tags).to eq([['tag1:val1', 'tag2:val2'], ['tag1:val1', 'tag2:val3']]) - collection.flush!(queue) + values = payload[:series].map { |s| s[:points] }.sort + expect(values).to eq([[5, 6], [5, 7]]) expect(distributions.size).to eq(0) end @@ -297,18 +291,23 @@ def first_distribution_values threads_count = 5 metrics_count = 0 - expect(queue).to receive(:enqueue) do |event| - mutex.synchronize { metrics_count += event.payload[:series].size } - end.at_least(:once) - threads = Array.new(threads_count) do |i| Thread.new do collection.inc("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true) - collection.flush!(queue) + + events = collection.flush! + collection.inc("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) collection.distribution("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val2' }, common: true) collection.distribution("metric_name_#{i}", 5, tags: { tag1: 'val1', tag2: 'val3' }, common: true) - collection.flush!(queue) + + events += collection.flush! + + mutex.synchronize do + events.each do |event| + metrics_count += event.payload[:series].size + end + end end end diff --git a/spec/datadog/core/telemetry/metrics_manager_spec.rb b/spec/datadog/core/telemetry/metrics_manager_spec.rb index 723258640d9..3aef7cee414 100644 --- a/spec/datadog/core/telemetry/metrics_manager_spec.rb +++ b/spec/datadog/core/telemetry/metrics_manager_spec.rb @@ -188,18 +188,18 @@ def first_collection end describe '#flush!' do - subject(:flush!) { manager.flush!(queue) } - - let(:queue) { [] } + subject(:flush!) { manager.flush! } it 'forwards flush to the collections' do + events = [double(:event)] + collection = double(:collection) expect(Datadog::Core::Telemetry::MetricsCollection).to receive(:new).and_return(collection) expect(collection).to receive(:inc) - expect(collection).to receive(:flush!).with(queue) + expect(collection).to receive(:flush!).and_return(events) manager.inc(namespace, metric_name, value, tags: tags) - flush! + expect(flush!).to eq(events) end context 'when disabled' do @@ -208,13 +208,11 @@ def first_collection it 'does nothing' do expect(Datadog::Core::Telemetry::MetricsCollection).to_not receive(:new) - flush! + expect(flush!).to eq([]) end end context 'concurrently creating and flushing namespaces' do - let(:queue) { double('queue') } - it 'flushes all metrics' do mutex = Mutex.new @@ -222,16 +220,19 @@ def first_collection metrics_per_thread = 3 flushed_metrics_count = 0 - allow(queue).to receive(:enqueue) do |event| - mutex.synchronize { flushed_metrics_count += event.payload[:series].count } - end threads = Array.new(threads_count) do |n| Thread.new do metrics_per_thread.times do |i| manager.inc("namespace #{n}", "#{metric_name} #{i}", value, tags: tags) end - manager.flush!(queue) + events = manager.flush! + + mutex.synchronize do + events.each do |event| + flushed_metrics_count += event.payload[:series].count + end + end end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 54b23ca6e79..64725de8821 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -7,14 +7,18 @@ described_class.new( enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: emitter, + metrics_manager: metrics_manager, dependency_collection: dependency_collection ) end let(:enabled) { true } let(:heartbeat_interval_seconds) { 0.5 } - let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + let(:metrics_aggregation_interval_seconds) { 0.25 } + let(:metrics_manager) { instance_double(Datadog::Core::Telemetry::MetricsManager, flush!: [], disable!: nil) } + let(:emitter) { instance_double(Datadog::Core::Telemetry::Emitter) } let(:dependency_collection) { false } let(:backend_supports_telemetry?) { true } @@ -58,7 +62,7 @@ it 'creates a new worker in stopped state' do expect(worker).to have_attributes( enabled?: true, - loop_base_interval: heartbeat_interval_seconds, + loop_base_interval: metrics_aggregation_interval_seconds, run_async?: false, running?: false, started?: false @@ -94,7 +98,7 @@ expect(worker).to have_attributes( enabled?: true, - loop_base_interval: heartbeat_interval_seconds, + loop_base_interval: metrics_aggregation_interval_seconds, run_async?: true, running?: true, started?: true @@ -152,6 +156,7 @@ context 'when app-started event exhausted retries' do let(:heartbeat_interval_seconds) { 0.1 } + let(:metrics_aggregation_interval_seconds) { 0.05 } it 'stops retrying, never sends heartbeat, and disables worker' do expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) @@ -202,6 +207,32 @@ try_wait_until { sent_dependencies } end end + + context 'when metrics are flushed' do + before do + allow(metrics_manager).to receive(:flush!).and_return( + [Datadog::Core::Telemetry::Event::GenerateMetrics.new('namespace', [])] + ) + end + + it 'sends metrics event' do + received_metrics = false + + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch) + ) do |event| + event.events.each do |subevent| + received_metrics = true if subevent.is_a?(Datadog::Core::Telemetry::Event::GenerateMetrics) + end + + response + end + + worker.start + + try_wait_until { received_metrics } + end + end end context 'when internal error returned by emitter' do @@ -239,7 +270,9 @@ described_class.new( enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: emitter, + metrics_manager: metrics_manager, dependency_collection: dependency_collection ) end @@ -270,11 +303,12 @@ describe '#stop' do let(:heartbeat_interval_seconds) { 60 } + let(:metrics_aggregation_interval_seconds) { 30 } it 'flushes events and stops the worker' do worker.start - try_wait_until { @received_heartbeat } + try_wait_until { @received_started } events_received = 0 mutex = Mutex.new diff --git a/static-analysis.datadog.yml b/static-analysis.datadog.yml index 79a2fc4df7b..671cae3598f 100644 --- a/static-analysis.datadog.yml +++ b/static-analysis.datadog.yml @@ -2,4 +2,8 @@ schema-version: v1 rulesets: - ruby-code-style - ruby-security - - ruby-best-practices + - ruby-best-practices: + rules: + percent-w: + ignore: + - '**'