diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index 6324f8ac795..f457a434ce4 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -28,6 +28,7 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: Emitter.new ) + @worker.start end def disable! @@ -38,26 +39,24 @@ def disable! def started! return if !@enabled || forked? - @worker.start @worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection @started = true end - def emit_closing! - return if !@enabled || forked? - - @worker.enqueue(Event::AppClosing.new) - end - def stop! return if @stopped - # gracefully stop the worker and send leftover events - @worker.stop + @worker.stop(true) @stopped = true end + def emit_closing! + return if !@enabled || forked? + + @worker.enqueue(Event::AppClosing.new) + end + def integrations_change! return if !@enabled || forked? diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index da0bc7dd455..8ba8e8723b1 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -2,6 +2,7 @@ require_relative 'event' +require_relative '../utils/only_once_successful' require_relative '../workers/polling' require_relative '../workers/queue' @@ -15,6 +16,8 @@ class Worker DEFAULT_BUFFER_MAX_SIZE = 1000 + TELEMETRY_STARTED_ONCE = Utils::OnlyOnceSuccessful.new + def initialize( heartbeat_interval_seconds:, emitter:, @@ -24,8 +27,6 @@ def initialize( ) @emitter = emitter - @sent_started_event = false - # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings @@ -48,6 +49,8 @@ def start def stop(force_stop = false, timeout = @shutdown_timeout) buffer.close if running? + flush_events(dequeue) if work_pending? + super end @@ -56,7 +59,7 @@ def enqueue(event) end def sent_started_event? - @sent_started_event + TELEMETRY_STARTED_ONCE.ran? end private @@ -89,24 +92,25 @@ def heartbeat! def started! return unless enabled? - res = send_event(Event::AppStarted.new) + TELEMETRY_STARTED_ONCE.run do + res = send_event(Event::AppStarted.new) - if res.not_found? # Telemetry is only supported by agent versions 7.34 and up - Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') - self.enabled = false - elsif res.ok? - Datadog.logger.debug('Telemetry app-started event is successfully sent') - @sent_started_event = true - else - Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') + if res.ok? + Datadog.logger.debug('Telemetry app-started event is successfully sent') + true + else + Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') + false + end end end def send_event(event) - Datadog.logger.debug { "Sending telemetry event: #{event}" } - response = @emitter.request(event) - Datadog.logger.debug { "Received response: #{response}" } - response + res = @emitter.request(event) + + disable_on_not_found!(res) + + res end def dequeue @@ -120,6 +124,13 @@ def buffer_klass Core::Buffer::ThreadSafe end end + + def disable_on_not_found!(response) + return unless response.not_found? + + Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') + self.enabled = false + end end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 9220dfeea09..b804a19664d 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -8,6 +8,7 @@ module Datadog include Core::Workers::IntervalLoop include Core::Workers::Queue + TELEMETRY_STARTED_ONCE: Datadog::Core::Utils::OnlyOnceSuccessful DEFAULT_BUFFER_MAX_SIZE: 1000 @emitter: Emitter @@ -23,6 +24,8 @@ module Datadog def enqueue: (Event::Base event) -> void + def dequeue: () -> Array[Event::Base] + private def heartbeat!: () -> void @@ -33,6 +36,8 @@ module Datadog def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void + def buffer_klass: () -> untyped end end diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index 5d1f7f013f7..33f4544ec6e 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -82,7 +82,7 @@ it do started! - expect(worker).to_not have_received(:start) + expect(worker).to_not have_received(:enqueue) end end @@ -118,7 +118,7 @@ expect_in_fork do telemetry.started! - expect(worker).to_not have_received(:start) + expect(worker).to_not have_received(:enqueue) end end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index ba389c23801..6008cefd892 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -42,8 +42,10 @@ end after do - worker.stop(true, 0) + worker.stop(true) worker.join + + Datadog::Core::Telemetry::Worker::TELEMETRY_STARTED_ONCE.send(:reset_ran_once_state_for_tests) end describe '.new' do @@ -97,19 +99,19 @@ end it 'always sends heartbeat event after started event' do - @sent_hearbeat = false + sent_hearbeat = false allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do # app-started was already sent by now expect(worker.sent_started_event?).to be(true) - @sent_hearbeat = true + sent_hearbeat = true response end worker.start - try_wait_until { @sent_hearbeat } + try_wait_until { sent_hearbeat } end end @@ -124,6 +126,42 @@ expect(@received_heartbeat).to be(false) end end + + context 'several workers running' do + it 'sends single started event' do + started_events = 0 + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) do + started_events += 1 + + response + end + + heartbeat_events = 0 + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + heartbeat_events += 1 + + response + end + + workers = Array.new(3) do + described_class.new( + enabled: enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: emitter + ) + end + workers.each(&:start) + + try_wait_until { heartbeat_events >= 3 } + + expect(started_events).to be(1) + + workers.each do |w| + w.stop(true, 0) + w.join + end + end + end end context 'when disabled' do @@ -137,6 +175,36 @@ end end + describe '#stop' do + let(:heartbeat_interval_seconds) { 3 } + + it 'flushes events and stops the worker' do + events_received = 0 + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) do + events_received += 1 + + response + end + + worker.start + + worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) + worker.stop(true) + + try_wait_until { !worker.running? } + + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: heartbeat_interval_seconds, + run_async?: false, + running?: false, + started?: true + ) + end + end + describe '#enqueue' do it 'adds events to the buffer and flushes them later' do events_received = 0 @@ -144,6 +212,8 @@ an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) ) do events_received += 1 + + response end worker.start