Skip to content

Commit

Permalink
flush metrics in the telemetry worker
Browse files Browse the repository at this point in the history
  • Loading branch information
anmarchenko committed Jul 9, 2024
1 parent e38a4d6 commit fd7ae93
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def initialize(
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: Emitter.new,
metrics_manager: @metrics_manager,
dependency_collection: dependency_collection
)
@worker.start
Expand Down
19 changes: 14 additions & 5 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
emitter:,
metrics_manager:,
dependency_collection:,
enabled: true,
shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
@emitter = emitter
@metrics_manager = metrics_manager
@dependency_collection = dependency_collection

@ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
Expand Down Expand Up @@ -79,8 +81,10 @@ def perform(*events)
return if !enabled? || forked?

started! unless sent_started_event?
# flush metrics here
flush_events(events)

metric_events = @metrics_manager.flush!
events = [] if events.nil?
flush_events(events + metric_events)

@current_ticks += 1
return if @current_ticks < @ticks_per_heartbeat
Expand All @@ -90,7 +94,7 @@ def perform(*events)
end

def flush_events(events)
return if events.nil? || events.empty?
return if events.empty?
return if !enabled? || !sent_started_event?

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
Expand All @@ -108,7 +112,7 @@ def started!

if failed_to_start?
Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker')
self.enabled = false
disable!
return
end

Expand Down Expand Up @@ -152,11 +156,16 @@ def buffer_klass
end
end

def disable!
self.enabled = false
@metrics_manager.disable!
end

def disable_on_not_found!(response)
return unless response.not_found?

Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
self.enabled = false
disable!
end
end
end
Expand Down
9 changes: 6 additions & 3 deletions sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ module Datadog
DEFAULT_BUFFER_MAX_SIZE: 1000

@emitter: Emitter
@metrics_manager: MetricsManager
@sent_started_event: bool
@shutdown_timeout: Integer
@buffer_size: Integer
@dependency_collection: bool
@ticks_per_heartbeat: Integer
@current_ticks: Integer

def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void
def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void

def start: () -> void

Expand All @@ -40,9 +41,11 @@ module Datadog

def flush_events: (Array[Event::Base] events) -> void

def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response
def send_event: (Event::Base event) -> Http::Adapters::Net::Response

def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void
def disable!: () -> void

def disable_on_not_found!: (Http::Adapters::Net::Response response) -> void

def buffer_klass: () -> untyped
end
Expand Down
4 changes: 3 additions & 1 deletion spec/datadog/core/telemetry/component_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
dependency_collection: dependency_collection,
enabled: enabled,
emitter: an_instance_of(Datadog::Core::Telemetry::Emitter)
emitter: an_instance_of(Datadog::Core::Telemetry::Emitter),
metrics_manager: anything
).and_return(worker)

allow(worker).to receive(:start)
Expand Down Expand Up @@ -218,6 +219,7 @@
let(:value) { double('value') }
let(:tags) { double('tags') }
let(:common) { double('common') }

before do
expect(Datadog::Core::Telemetry::MetricsManager).to receive(:new).with(
aggregation_interval: metrics_aggregation_interval_seconds,
Expand Down
31 changes: 30 additions & 1 deletion spec/datadog/core/telemetry/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: emitter,
metrics_manager: metrics_manager,
dependency_collection: dependency_collection
)
end

let(:enabled) { true }
let(:heartbeat_interval_seconds) { 0.5 }
let(:metrics_aggregation_interval_seconds) { 0.25 }
let(:emitter) { double(Datadog::Core::Telemetry::Emitter) }
let(:metrics_manager) { instance_double(Datadog::Core::Telemetry::MetricsManager, flush!: [], disable!: nil) }
let(:emitter) { instance_double(Datadog::Core::Telemetry::Emitter) }
let(:dependency_collection) { false }

let(:backend_supports_telemetry?) { true }
Expand Down Expand Up @@ -205,6 +207,32 @@
try_wait_until { sent_dependencies }
end
end

context 'when metrics are flushed' do
before do
allow(metrics_manager).to receive(:flush!).and_return(
[Datadog::Core::Telemetry::Event::GenerateMetrics.new('namespace', [])]
)
end

it 'sends metrics event' do
received_metrics = false

allow(emitter).to receive(:request).with(
an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch)
) do |event|
event.events.each do |subevent|
received_metrics = true if subevent.is_a?(Datadog::Core::Telemetry::Event::GenerateMetrics)
end

response
end

worker.start

try_wait_until { received_metrics }
end
end
end

context 'when internal error returned by emitter' do
Expand Down Expand Up @@ -244,6 +272,7 @@
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: emitter,
metrics_manager: metrics_manager,
dependency_collection: dependency_collection
)
end
Expand Down

0 comments on commit fd7ae93

Please sign in to comment.