Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDTEST-409] Telemetry metrics support #3768

Merged
merged 9 commits into from
Jul 10, 2024
2 changes: 2 additions & 0 deletions lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def build_telemetry(settings, agent_settings, logger)

Telemetry::Component.new(
enabled: enabled,
metrics_enabled: enabled && settings.telemetry.metrics_enabled,
heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: settings.telemetry.metrics_aggregation_interval_seconds,
dependency_collection: settings.telemetry.dependency_collection
)
end
Expand Down
23 changes: 23 additions & 0 deletions lib/datadog/core/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,16 @@ def initialize(*_)
o.type :bool
end

# Enable metrics collection for telemetry. Metrics collection only works when telemetry is enabled and
# metrics are enabled.
# @default `DD_TELEMETRY_METRICS_ENABLED` environment variable, otherwise `true`.
# @return [Boolean]
option :metrics_enabled do |o|
o.type :bool
o.env Core::Telemetry::Ext::ENV_METRICS_ENABLED
o.default true
end

# The interval in seconds when telemetry must be sent.
#
# This method is used internally, for testing purposes only.
Expand All @@ -676,6 +686,19 @@ def initialize(*_)
o.default 60.0
end

# The interval in seconds when telemetry metrics are aggregated.
# Should be a denominator of `heartbeat_interval_seconds`.
#
# This method is used internally, for testing purposes only.
# @default `DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL` environment variable, otherwise `10`.
# @return [Float]
# @!visibility private
option :metrics_aggregation_interval_seconds do |o|
o.type :float
o.env Core::Telemetry::Ext::ENV_METRICS_AGGREGATION_INTERVAL
o.default 10.0
end

# The install id of the application.
#
# This method is used internally, by library injection.
Expand Down
73 changes: 72 additions & 1 deletion lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'emitter'
require_relative 'event'
require_relative 'metrics_manager'
require_relative 'worker'
require_relative '../utils/forking'

Expand All @@ -15,16 +16,31 @@ class Component
include Core::Utils::Forking

# @param enabled [Boolean] Determines whether telemetry events should be sent to the API
# @param metrics_enabled [Boolean] Determines whether telemetry metrics should be sent to the API
# @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds.
# @param metrics_aggregation_interval_seconds [Float] How frequently metrics will be aggregated, in seconds.
# @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event
def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true)
def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
dependency_collection:,
enabled: true,
metrics_enabled: true
)
@enabled = enabled
@stopped = false

@metrics_manager = MetricsManager.new(
enabled: enabled && metrics_enabled,
aggregation_interval: metrics_aggregation_interval_seconds
)

@worker = Telemetry::Worker.new(
enabled: @enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: Emitter.new,
metrics_manager: @metrics_manager,
dependency_collection: dependency_collection
)
@worker.start
Expand Down Expand Up @@ -60,6 +76,61 @@ def client_configuration_change!(changes)

@worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
end

# Increments a count metric.
# @param namespace [String] metric namespace (per product, such as 'civisibility', 'tracers', 'profilers')
# @param metric_name [String] metric name
# @param value [Float] metric value
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of tag:value pairs or array of "tag:val"
# strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def inc(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.inc(namespace, metric_name, value, tags: tags, common: common)
end

# Decremenets a count metric.
# @param namespace [String] metric namespace (per product, such as 'civisibility', 'tracers', 'profilers')
# @param metric_name [String] metric name
# @param value [Float] metric value
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of tag:value pairs or array of "tag:val"
# strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def dec(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.dec(namespace, metric_name, value, tags: tags, common: common)
end

# Tracks gauge metric.
# @param namespace [String] metric namespace (per product, such as 'civisibility', 'tracers', 'profilers')
# @param metric_name [String] metric name
# @param value [Float] metric value
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of tag:value pairs or array of "tag:val"
# strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def gauge(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.gauge(namespace, metric_name, value, tags: tags, common: common)
end

# Tracks rate metric.
# @param namespace [String] metric namespace (per product, such as 'civisibility', 'tracers', 'profilers')
# @param metric_name [String] metric name
# @param value [Float] metric value
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of tag:value pairs or array of "tag:val"
# strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def rate(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.rate(namespace, metric_name, value, tags: tags, common: common)
end

# Tracks distribution metric.
# @param namespace [String] metric namespace (per product, such as 'civisibility', 'tracers', 'profilers')
# @param metric_name [String] metric name
# @param value [Float] metric value
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of tag:value pairs or array of "tag:val"
# strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def distribution(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.distribution(namespace, metric_name, value, tags: tags, common: common)
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/datadog/core/telemetry/ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ module Core
module Telemetry
module Ext
ENV_ENABLED = 'DD_INSTRUMENTATION_TELEMETRY_ENABLED'
ENV_METRICS_ENABLED = 'DD_TELEMETRY_METRICS_ENABLED'
ENV_HEARTBEAT_INTERVAL = 'DD_TELEMETRY_HEARTBEAT_INTERVAL'
ENV_METRICS_AGGREGATION_INTERVAL = 'DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL'
ENV_DEPENDENCY_COLLECTION = 'DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED'
ENV_INSTALL_ID = 'DD_INSTRUMENTATION_INSTALL_ID'
ENV_INSTALL_TYPE = 'DD_INSTRUMENTATION_INSTALL_TYPE'
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Base
attr_reader :name, :tags, :values, :common

# @param name [String] metric name
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of array of "tag:val" strings
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash or array of "tag:val" strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def initialize(name, tags: {}, common: true)
@name = name
Expand Down
10 changes: 6 additions & 4 deletions lib/datadog/core/telemetry/metrics_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ def distribution(metric_name, value, tags: {}, common: true)
fetch_or_add_distribution(metric, value)
end

def flush!(queue)
def flush!
@mutex.synchronize do
queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any?
queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any?
events = []
events << Event::GenerateMetrics.new(@namespace, @metrics.values) if @metrics.any?
events << Event::Distributions.new(@namespace, @distributions.values) if @distributions.any?

@metrics = {}
@distributions = {}

events
end
nil
end

private
Expand Down
8 changes: 3 additions & 5 deletions lib/datadog/core/telemetry/metrics_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ def distribution(namespace, metric_name, value, tags: {}, common: true)
collection.distribution(metric_name, value, tags: tags, common: common)
end

def flush!(queue)
return unless @enabled
def flush!
return [] unless @enabled

collections = @mutex.synchronize { @collections.values }
collections.each { |col| col.flush!(queue) }

nil
collections.reduce([]) { |events, collection| events + collection.flush! }
end

def disable!
Expand Down
29 changes: 23 additions & 6 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@ class Worker

def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
emitter:,
metrics_manager:,
dependency_collection:,
enabled: true,
shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
@emitter = emitter
@metrics_manager = metrics_manager
@dependency_collection = dependency_collection

@ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
@current_ticks = 0

# Workers::Polling settings
self.enabled = enabled
# Workers::IntervalLoop settings
self.loop_base_interval = heartbeat_interval_seconds
self.loop_base_interval = metrics_aggregation_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

@shutdown_timeout = shutdown_timeout
Expand Down Expand Up @@ -76,13 +82,19 @@ def perform(*events)

started! unless sent_started_event?

heartbeat!
metric_events = @metrics_manager.flush!
events = [] if events.nil?
flush_events(events + metric_events)

flush_events(events)
@current_ticks += 1
return if @current_ticks < @ticks_per_heartbeat

@current_ticks = 0
heartbeat!
end

def flush_events(events)
return if events.nil? || events.empty?
return if events.empty?
return if !enabled? || !sent_started_event?

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
Expand All @@ -100,7 +112,7 @@ def started!

if failed_to_start?
Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker')
self.enabled = false
disable!
return
end

Expand Down Expand Up @@ -144,11 +156,16 @@ def buffer_klass
end
end

def disable!
self.enabled = false
@metrics_manager.disable!
end

def disable_on_not_found!(response)
return unless response.not_found?

Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
self.enabled = false
disable!
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ module Datadog
class Component
@enabled: bool
@stopped: bool
@metrics_manager: Datadog::Core::Telemetry::MetricsManager
@worker: Datadog::Core::Telemetry::Worker

attr_reader enabled: bool

include Core::Utils::Forking

def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void
def initialize: (heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, dependency_collection: bool, ?enabled: bool, ?metrics_enabled: bool) -> void

def disable!: () -> void

Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/ext.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ module Datadog
module Ext
ENV_DEPENDENCY_COLLECTION: ::String
ENV_ENABLED: ::String
ENV_METRICS_ENABLED: ::String
ENV_HEARTBEAT_INTERVAL: ::String
ENV_METRICS_AGGREGATION_INTERVAL: ::String
ENV_INSTALL_ID: ::String
ENV_INSTALL_TIME: ::String
ENV_INSTALL_TYPE: ::String
Expand Down
6 changes: 3 additions & 3 deletions sig/datadog/core/telemetry/metric.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ module Datadog
end

class IntervalMetric < Base
@interval: Integer
@interval: Float

attr_reader interval: Integer
attr_reader interval: Float

def initialize: (String name, ?tags: tags_input, ?common: bool, interval: Integer) -> void
def initialize: (String name, ?tags: tags_input, ?common: bool, interval: Float) -> void
end

class Count < Base
Expand Down
10 changes: 3 additions & 7 deletions sig/datadog/core/telemetry/metrics_collection.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ module Datadog
module Core
module Telemetry
class MetricsCollection
interface _Queue
def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void
end

@namespace: String

@interval: Integer
@interval: Float

@mutex: Thread::Mutex

Expand All @@ -18,7 +14,7 @@ module Datadog

attr_reader namespace: String

def initialize: (String namespace, aggregation_interval: Integer) -> void
def initialize: (String namespace, aggregation_interval: Float) -> void

def inc: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

Expand All @@ -30,7 +26,7 @@ module Datadog

def distribution: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

def flush!: (_Queue queue) -> void
def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base]

private

Expand Down
10 changes: 3 additions & 7 deletions sig/datadog/core/telemetry/metrics_manager.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ module Datadog
module Core
module Telemetry
class MetricsManager
interface _Queue
def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void
end

@interval: Integer
@interval: Float

@enabled: bool

Expand All @@ -16,7 +12,7 @@ module Datadog

attr_reader enabled: bool

def initialize: (aggregation_interval: Integer, enabled: bool) -> void
def initialize: (aggregation_interval: Float, enabled: bool) -> void

def inc: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

Expand All @@ -28,7 +24,7 @@ module Datadog

def distribution: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

def flush!: (_Queue queue) -> void
def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base]

def disable!: () -> void

Expand Down
Loading
Loading