You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Invoking the above Function a dozen times in succession, results in a dozen invocations to a single execution context, and thus a single instance of ThreadStatsWriter
The resulting metrics show several jltest.foo emissions, followed by several "lost" metrics, despite continued invocation metrics from the DD Extension.
DD Notebook Screenshot (collapsed)
There is a corresponding notebook for the above screenshot, linked in a corresponding support ticket # 1857092 -- feel free to DM for additional details
Anecdotally, only 33 distribution points are emitted from the above code, though one would expect 3 * ${num invocations}, whether the Function is invoked a dozen times, twenty times, or even more.
Forcing the creation of a new execution context (e.g. by making a whitespace code change, or updating an env var) results in a NEW instance of ThreadStatsWriter, which emits the same number of metric points, before it too begins "flushing" unsent metrics.
The DD Layer's lambda_metric function normally emits metrics to a StatsDWriter (source) using the serverless extension.
This works as expected, when those metrics are not timestamped.
Providing an explicit timestamp (in epoch seconds) causes the DD Layer to execute this code path (source) introduced in #480
deflambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
...
ifshould_use_extensionandtimestampisnotNone:
# The extension does not support timestamps for distributions so we create a# a thread stats writer to submit metrics with timestamps to the API
...
globalextension_thread_statsifextension_thread_statsisNone:
...
extension_thread_stats=ThreadStatsWriter(flush_in_thread) # flush_in_thread = Falseextension_thread_stats.distribution(
metric_name, value, tags=tags, timestamp=timestamp
)
return
Metric points submitted to this extension_thread_stats instance of ThreadStatsWriter work fine for the first ~ dozen Function invocations, after which they simply fail to appear in the Metrics API.
Custom metrics sent without timestamps appear correctly, (over dozens of invocations), as they do not traverse the conditional above, and thus use the StatsDWriter -> Serverless Agent in the DD Extenstion
aws.lambda.enhanced.* metrics (from the Extension) appear correctly (over dozens of invocations)
Full call path - emission (collapsed)
# ==== CALL CHAIN: emitting a Distribution (not flushing) ============================ ## My Functionlambda_metric(..., timestamp=<intepochseconds>)
# https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/metric.py#L62ifshould_use_extensionandtimestampisnotNone:
# The extension does not support timestamps for distributions so we create a# a thread stats writer to submit metrics with timestamps to the API
...
globalextension_thread_statsifextension_thread_statsisNone:
...
extension_thread_stats=ThreadStatsWriter(flush_in_thread) # Falseextension_thread_stats.distribution(
metric_name, value, tags=tags, timestamp=timestamp
)
return# https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/thread_stats_writer.py#L21classThreadStatsWriter(StatsWriter):
defdistribution(self, metric_name, value, tags=[], timestamp=None):
self.thread_stats.distribution(
metric_name, value, tags=tags, timestamp=timestamp
)
# https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/base.py#L284classThreadStats(object):
defdistribution(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None):
# L145: self._metric_aggregator = MetricAggregator(self.roll_up_interval) # roll up = 10ifnotself._disabled:
self._metric_aggregator.add_point(
metric_name, tags, timestamportime(), value, Distribution, sample_rate=sample_rate, host=host
)
# https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/metrics.py#L183classMetricsAggregator(object):
def__init__(self, roll_up_interval=10): # this default will be used in our scenarioself._lock=threading.RLock()
self._metrics=defaultdict(lambda: {})
self._roll_up_interval=roll_up_intervaldefadd_point(self, metric, tags, timestamp, value, metric_class, sample_rate=1, host=None):
# The sample rate is currently ignored for in process stuffinterval=timestamp-timestamp%self._roll_up_intervalkey= (metric, host, tuple(sorted(tags)) iftagselseNone)
withself._lock:
ifkeynotinself._metrics[interval]:
self._metrics[interval][key] =metric_class(metric, tags, host)
self._metrics[interval][key].add_point(value)
# https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/metrics.py#L97classDistribution(Metric):
""" A distribution metric. """stats_tag="d"def__init__(self, name, tags, host):
self.name=nameself.tags=tagsself.host=hostself.value= []
defadd_point(self, value):
self.value.append(value)
Full call path - flushing (collapsed)
# ==== CALL CHAIN: flushing a Distribution (not emitting) ============================ ## My Function@datadog_lambda_wrapperdefmain(event, context, *args, **kwargs):
...
# https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/wrapper.py#L236class_LambdaDecorator(object):
def__call__(self, event, context, **kwargs):
"""Executes when the wrapped function gets called"""self._before(event, context)
try:
self.response=self.func(event, context, **kwargs)
returnself.responseexceptException:
submit_errors_metric(context)
ifself.span:
self.span.set_traceback()
raisefinally:
self._after(event, context) <<<<<<HEREdef_after(self, event, context):
try:
...
ifnotself.flush_to_logorshould_use_extension:
flush_stats(context)
...
datadog_lambda_wrapper=_LambdaDecorator# https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/metric.py#L122# TOFlambda_stats=Noneextension_thread_stats=Noneflush_in_thread=os.environ.get("DD_FLUSH_IN_THREAD", "").lower() =="true"ifshould_use_extension:
lambda_stats=StatsDWriter()
else:
lambda_stats=ThreadStatsWriter(flush_in_thread)
# extension_thread_stats will be a NEW instance of extension_thread_stats = ThreadStatsWriter(flush_in_thread)# after entering the `if should_use_extension and timestamp is not None` conditional in `lambda_metric`
...
defflush_stats(lambda_context=None):
lambda_stats.flush() # ThreadStatsWriter.flush()ifextension_thread_statsisnotNone:
extension_thread_stats.flush(tags) # ThreadStatsWriter.flush()# https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/thread_stats_writer.py#L25classThreadStatsWriter(StatsWriter):
defflush(self, tags=None):
_, dists=self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
# We DO see this line in the logs when DD_LOG_LEVEL == DEBUG, even for "unsent" MetricPointsself.thread_stats.flush_count+=1logger.debug("Flush #%s sending %s distributions", self.thread_stats.flush_count, count_dists)
try:
self.thread_stats.reporter.flush_distributions(dists)
exceptExceptionase:
...
# https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/base.py#L151classThreadStats(object):
# The point here is that self.thread_stats.reporter == HttpReporter(compress_payload=self.compress_payload)defstart(flush_in_thread=True, flush_in_greenlet=False):
# The reporter is responsible for sending metrics off to their final destination.# It's abstracted to support easy unit testing and in the near future, forwarding# to the datadog agent.self.reporter=HttpReporter(compress_payload=self.compress_payload)
self.flush_count=0# We never see this line, and we DO see log lines about flushingifself._disabled:
log.info("ThreadStats instance is disabled. No metrics will flush.")
else:
# THIS may be our next place to dig, if we're creating one on the fly# flush_in_greenlet defaults to false# flush_in_thread is explicitly false, back up the chainifflush_in_greenlet:
self._start_flush_greenlet()
elifflush_in_thread:
self._start_flush_thread()
# Flush all remaining metrics on exitatexit.register(lambda: self.flush(float("inf")))
# https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/reporters.py#L17classHttpReporter(Reporter):
def__init__(self, compress_payload=False):
self.compress_payload=compress_payloaddefflush_distributions(self, distributions):
api.Distribution.send(distributions, compress_payload=self.compress_payload)
# We DO see the log lines for thisINFO [2024-09-1718:27:35.977] [17, 140623641798464] datadog.api202POSThttps://api.datadoghq.com/api/v1/distribution_points (156.9059ms)
INFO [2024-09-1718:27:41.338] [17, 140623641798464] datadog.api202POSThttps://api.datadoghq.com/api/v1/distribution_points (38.5311ms)
...
# Even for metric points which DON'T end up in DD
Logging
After enabling debug logging inside both the Extension and the Layer (by setting env var DD_LOG_LEVEL="DEBUG")
The ThreadStatsWriterdoes log the following, even for "unsent" metric points (log source)
This log line occurs with the correct numbers for both flush count and distribution count (e.g. when invoking the same execution context a dozen times in a row)
The serverless extension's logs show it flushing metrics correctly, from which we get aws.lambda.enhanced.* metrics
The text was updated successfully, but these errors were encountered:
Expected Behavior
Repeated calls to lambda_metric should result in metric emission for the entire lifespand of an execution context
Actual Behavior
We are seeing lost metric points when submitting explicitly timestamped custom metrics via the Datadog Lambda Layer for Python.
This behavior is reproducible with a simple Lambda Function:
Invoking the above Function a dozen times in succession, results in a dozen invocations to a single execution context, and thus a single instance of ThreadStatsWriter
The resulting metrics show several
jltest.foo
emissions, followed by several "lost" metrics, despite continued invocation metrics from the DD Extension.DD Notebook Screenshot (collapsed)
There is a corresponding notebook for the above screenshot, linked in a corresponding support ticket #
1857092
-- feel free to DM for additional detailsAnecdotally, only 33 distribution points are emitted from the above code, though one would expect
3 * ${num invocations}
, whether the Function is invoked a dozen times, twenty times, or even more.Forcing the creation of a new execution context (e.g. by making a whitespace code change, or updating an env var) results in a NEW instance of
ThreadStatsWriter
, which emits the same number of metric points, before it too begins "flushing" unsent metrics.Specifications
Stacktrace
N/A -- DD logs seem "normal"
Detail / Steps to Reproduce the Problem
The Function used to reproduce the problem has the following handler and DD-provided Layers:
Code Paths
The DD Layer's
lambda_metric
function normally emits metrics to aStatsDWriter
(source) using the serverless extension.This works as expected, when those metrics are not timestamped.
Providing an explicit timestamp (in epoch seconds) causes the DD Layer to execute this code path (source) introduced in #480
Metric points submitted to this
extension_thread_stats
instance ofThreadStatsWriter
work fine for the first ~ dozen Function invocations, after which they simply fail to appear in the Metrics API.aws.lambda.enhanced.*
metrics (from the Extension) appear correctly (over dozens of invocations)Full call path - emission (collapsed)
Full call path - flushing (collapsed)
Logging
After enabling debug logging inside both the Extension and the Layer (by setting env var
DD_LOG_LEVEL="DEBUG"
)ThreadStatsWriter
does log the following, even for "unsent" metric points (log source)logger.debug("Flush #%s sending %s distributions", self.thread_stats.flush_count, count_dists)
aws.lambda.enhanced.*
metricsThe text was updated successfully, but these errors were encountered: