Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Support for external observability providers - Metrics #2343

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions aws_lambda_powertools/metrics/provider/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from aws_lambda_powertools.metrics.provider.base import MetricsBase, MetricsProviderBase
from aws_lambda_powertools.metrics.provider.datadog_provider_draft import (
DataDogMetrics,
DataDogProvider,
)

__all__ = [
"MetricsBase",
"MetricsProviderBase",
"DataDogMetrics",
"DataDogProvider",
]
128 changes: 128 additions & 0 deletions aws_lambda_powertools/metrics/provider/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import functools
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Optional, Union

logger = logging.getLogger(__name__)

is_cold_start = True


class MetricsProviderBase(ABC):
"""Class for metric provider template

Use this template to create your own metric provider.

"""

# General add metric function. Should return combined metrics Dict
@abstractmethod
def add_metric(self, *args, **kwargs):
pass

# serialize and return dict for flushing
@abstractmethod
def serialize(self, *args, **kwargs):
pass

# flush serialized data to output, or send to API directly
@abstractmethod
def flush(self, *args, **kwargs):
pass


class MetricsBase(ABC):
"""Class for metric template

Use this template to create your own metric class.

"""

@abstractmethod
def add_metric(self, *args, **kwargs):
pass

@abstractmethod
def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
pass

def log_metrics(
self,
lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
capture_cold_start_metric: bool = False,
raise_on_empty_metrics: bool = False,
):
"""Decorator to serialize and publish metrics at the end of a function execution.

Be aware that the log_metrics **does call* the decorated function (e.g. lambda_handler).

Example
-------
**Lambda function using tracer and metrics decorators**

from aws_lambda_powertools import Metrics, Tracer

metrics = Metrics(service="payment")
tracer = Tracer(service="payment")

@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
...

Parameters
----------
lambda_handler : Callable[[Any, Any], Any], optional
lambda function handler, by default None
capture_cold_start_metric : bool, optional
captures cold start metric, by default False
raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
default_dimensions: Dict[str, str], optional
metric dimensions as key=value that will always be present

Raises
------
e
Propagate error received
"""

# If handler is None we've been called with parameters
# Return a partial function with args filled
if lambda_handler is None:
logger.debug("Decorator called with parameters")
return functools.partial(
self.log_metrics,
capture_cold_start_metric=capture_cold_start_metric,
raise_on_empty_metrics=raise_on_empty_metrics,
)

@functools.wraps(lambda_handler)
def decorate(event, context):
try:
response = lambda_handler(event, context)
if capture_cold_start_metric:
self._add_cold_start_metric(context=context)
finally:
self.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics)

return response

return decorate

def _add_cold_start_metric(self, context: Any) -> None:
"""Add cold start metric and function_name dimension

Parameters
----------
context : Any
Lambda context
"""
global is_cold_start
if not is_cold_start:
return

logger.debug("Adding cold start metric and function_name dimension")
self.add_metric(name="ColdStart", value=1)

is_cold_start = False
118 changes: 118 additions & 0 deletions aws_lambda_powertools/metrics/provider/datadog_provider_draft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import annotations

import json
import logging
import numbers
import time
import warnings
from typing import Dict, List, Optional

from aws_lambda_powertools.metrics.exceptions import MetricValueError
from aws_lambda_powertools.metrics.provider import MetricsBase, MetricsProviderBase

logger = logging.getLogger(__name__)

# Check if using datadog layer
try:
from datadog_lambda.metric import lambda_metric # type: ignore
except ImportError:
lambda_metric = None


class DataDogProvider(MetricsProviderBase):
"""Class for datadog provider.
all datadog metric data will be stored as
see https://github.com/DataDog/datadog-lambda-python/blob/main/datadog_lambda/metric.py#L77
{
"m": metric_name,
"v": value,
"e": timestamp
"t": List["tag:value","tag2:value2"]
}
"""

def __init__(self, namespace):
self.metrics = []
self.namespace = namespace
super().__init__()

# adding name,value,timestamp,tags
# consider directly calling lambda_metric function here
def add_metric(self, name: str, value: float, timestamp: Optional[int] = None, tags: Optional[List] = None):
if not isinstance(value, numbers.Real):
raise MetricValueError(f"{value} is not a valid number")
if not timestamp:
timestamp = time.time()
self.metrics.append({"m": name, "v": int(value), "e": timestamp, "t": tags})

# serialize for flushing
def serialize(self) -> Dict:
# logic here is to add dimension and metadata to each metric's tag with "key:value" format
extra_tags: List = []
output_list: List = []

for single_metric in self.metrics:
output_list.append(
{
"m": f"{self.namespace}.{single_metric['m']}",
"v": single_metric["v"],
"e": single_metric["e"],
"t": single_metric["t"] + extra_tags,
}
)

return {"List": output_list}

# flush serialized data to output
def flush(self, metrics):
# submit through datadog extension
if lambda_metric:
# use lambda_metric function from datadog package, submit metrics to datadog
for metric_item in metrics.get("List"):
lambda_metric(
metric_name=metric_item["m"],
value=metric_item["v"],
timestamp=metric_item["e"],
tags=metric_item["t"],
)
else:
# flush to log with datadog format
# https://github.com/DataDog/datadog-lambda-python/blob/main/datadog_lambda/metric.py#L77
for metric_item in metrics.get("List"):
print(json.dumps(metric_item, separators=(",", ":")))

def clear(self):
self.metrics = []


class DataDogMetrics(MetricsBase):
"""Class for datadog metrics standalone class.

Example
-------
dd_provider = DataDogProvider(namespace="default")
metrics = DataDogMetrics(provider=dd_provider)

@metrics.log_metrics(capture_cold_start_metric: bool = True, raise_on_empty_metrics: bool = False)
def lambda_handler(event, context)
metrics.add_metric(name="item_sold",value=1,tags)
"""

# `log_metrics` and `_add_cold_start_metric` are directly inherited from `MetricsBase`
def __init__(self, provider):
self.provider = provider
super().__init__()

def add_metric(self, name: str, value: float, timestamp: Optional[int] = None, tags: Optional[List] = None):
self.provider.add_metric(name=name, value=value, timestamp=timestamp, tags=tags)

def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
metrics = self.provider.serialize()
if not metrics and raise_on_empty_metrics:
warnings.warn(
"No application metrics to publish. The cold-start metric may be published if enabled. "
"If application metrics should never be empty, consider using 'raise_on_empty_metrics'",
stacklevel=2,
)
self.provider.flush(metrics)
self.provider.clear()