Skip to content

Commit

Permalink
Implement MetricReader temporality controls (#2637)
Browse files Browse the repository at this point in the history
* Implement MetricReader temporality controls

Fixes #2627
Fixes #2636

* Fix type

* Add temporality check

* Fix check

* Fix check

* Fix lint

* Fix check

* Fix docs

* Add changelog

* Update changelog message

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
ocelotl and srikanthccv authored Apr 27, 2022
1 parent 1a1421f commit b0777a3
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 42 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21



- Add parameter to MetricReader constructor to select temporality per instrument kind
([#2637](https://github.com/open-telemetry/opentelemetry-python/pull/2637))
- Fix unhandled callback exceptions on async instruments
([#2614](https://github.com/open-telemetry/opentelemetry-python/pull/2614))
- Rename `DefaultCounter`, `DefaultHistogram`, `DefaultObservableCounter`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def consume_measurement(self, measurement: Measurement) -> None:

self._attributes_aggregation[attributes].aggregate(measurement)

def collect(self, temporality: int) -> Iterable[Metric]:
def collect(
self, instrument_class_temporality: Dict[type, AggregationTemporality]
) -> Iterable[Metric]:

with self._lock:
for (
Expand Down Expand Up @@ -106,13 +108,17 @@ def collect(self, temporality: int) -> Iterable[Metric]:
self._view._description
or self._instrument.description
),
instrumentation_scope=self._instrument.instrumentation_scope,
instrumentation_scope=(
self._instrument.instrumentation_scope
),
name=self._view._name or self._instrument.name,
resource=self._sdk_config.resource,
unit=self._instrument.unit,
point=_convert_aggregation_temporality(
previous_point,
current_point,
temporality,
instrument_class_temporality[
self._instrument.__class__
],
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from os import environ, linesep
from sys import stdout
from threading import Event, RLock, Thread
from typing import IO, Callable, Iterable, List, Optional, Sequence
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Expand Down Expand Up @@ -50,10 +50,6 @@ class MetricExporter(ABC):
in their own format.
"""

@property
def preferred_temporality(self) -> AggregationTemporality:
return AggregationTemporality.CUMULATIVE

@abstractmethod
def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
"""Exports a batch of telemetry data.
Expand Down Expand Up @@ -107,8 +103,7 @@ class InMemoryMetricReader(MetricReader):
"""

def __init__(
self,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
self, preferred_temporality: Dict[type, AggregationTemporality] = None
) -> None:
super().__init__(preferred_temporality=preferred_temporality)
self._lock = RLock()
Expand Down Expand Up @@ -139,10 +134,11 @@ class PeriodicExportingMetricReader(MetricReader):
def __init__(
self,
exporter: MetricExporter,
preferred_temporality: Dict[type, AggregationTemporality] = None,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
super().__init__(preferred_temporality=exporter.preferred_temporality)
super().__init__(preferred_temporality=preferred_temporality)
self._exporter = exporter
if export_interval_millis is None:
try:
Expand Down
12 changes: 8 additions & 4 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# pylint: disable=too-many-ancestors

import logging
from typing import Dict, Generator, Iterable, Optional, Union
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union

from opentelemetry._metrics.instrument import CallbackT
from opentelemetry._metrics.instrument import Counter as APICounter
Expand All @@ -31,9 +31,13 @@
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

if TYPE_CHECKING:
from opentelemetry.sdk._metrics.measurement_consumer import (
MeasurementConsumer,
)

_logger = logging.getLogger(__name__)


Expand All @@ -42,7 +46,7 @@ def __init__(
self,
name: str,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
measurement_consumer: "MeasurementConsumer",
unit: str = "",
description: str = "",
):
Expand All @@ -59,7 +63,7 @@ def __init__(
self,
name: str,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
measurement_consumer: "MeasurementConsumer",
callbacks: Optional[Iterable[CallbackT]] = None,
unit: str = "",
description: str = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from abc import ABC, abstractmethod
from threading import Lock
from typing import TYPE_CHECKING, Iterable, List, Mapping
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping

from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
Expand All @@ -40,7 +40,9 @@ def register_asynchronous_instrument(self, instrument: "_Asynchronous"):

@abstractmethod
def collect(
self, metric_reader: MetricReader, temporality: AggregationTemporality
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
) -> Iterable[Metric]:
pass

Expand All @@ -67,11 +69,15 @@ def register_asynchronous_instrument(
self._async_instruments.append(instrument)

def collect(
self, metric_reader: MetricReader, temporality: AggregationTemporality
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
) -> Iterable[Metric]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
for async_instrument in self._async_instruments:
for measurement in async_instrument.callback():
metric_reader_storage.consume_measurement(measurement)
return self._reader_storages[metric_reader].collect(temporality)
return self._reader_storages[metric_reader].collect(
instrument_type_temporality
)
85 changes: 78 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,100 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from abc import ABC, abstractmethod
from typing import Callable, Iterable
from logging import getLogger
from os import environ
from typing import Callable, Dict, Iterable

from typing_extensions import final

from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.sdk.environment_variables import (
_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
)

_logger = logging.getLogger(__name__)
_logger = getLogger(__name__)


class MetricReader(ABC):
"""
Base class for all metric readers
Args:
preferred_temporality: A mapping between instrument classes and
aggregation temporality. By default uses CUMULATIVE for all instrument
classes. This mapping will be used to define the default aggregation
temporality of every instrument class. If the user wants to make a
change in the default aggregation temporality of an instrument class,
it is enough to pass here a dictionary whose keys are the instrument
classes and the values are the corresponding desired aggregation
temporalities of the classes that the user wants to change, not all of
them. The classes not included in the passed dictionary will retain
their association to their default aggregation temporalities.
The value passed here will override the corresponding values set
via the environment variable
.. document protected _receive_metrics which is a intended to be overriden by subclass
.. automethod:: _receive_metrics
"""

# FIXME add :std:envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE`
# to the end of the documentation paragraph above.

def __init__(
self,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
self, preferred_temporality: Dict[type, AggregationTemporality] = None
) -> None:
self._collect: Callable[
["MetricReader", AggregationTemporality], Iterable[Metric]
] = None
self._preferred_temporality = preferred_temporality

if (
environ.get(
_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
== "DELTA"
):
self._instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
self._instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

if preferred_temporality is not None:
for temporality in preferred_temporality.values():
if temporality not in (
AggregationTemporality.CUMULATIVE,
AggregationTemporality.DELTA,
):
raise Exception(
f"Invalid temporality value found {temporality}"
)

self._instrument_class_temporality.update(preferred_temporality or {})

@final
def collect(self) -> None:
Expand All @@ -48,7 +117,9 @@ def collect(self) -> None:
"Cannot call collect on a MetricReader until it is registered on a MeterProvider"
)
return
self._receive_metrics(self._collect(self, self._preferred_temporality))
self._receive_metrics(
self._collect(self, self._instrument_class_temporality)
)

@final
def _set_collect_callback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,30 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
view_instrument_match.consume_measurement(measurement)

def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
# use a list instead of yielding to prevent a slow reader from holding SDK locks
def collect(
self, instrument_type_temporality: Dict[type, AggregationTemporality]
) -> Iterable[Metric]:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks
metrics: List[Metric] = []

# While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are
# sure we collect all existing view). However, instruments can still send measurements
# that will make it into the individual aggregations; collection will acquire those
# locks iteratively to keep locking as fine-grained as possible. One side effect is
# that end times can be slightly skewed among the metric streams produced by the SDK,
# but we still align the output timestamps for a single instrument.
# While holding the lock, new _ViewInstrumentMatch can't be added from
# another thread (so we are sure we collect all existing view).
# However, instruments can still send measurements that will make it
# into the individual aggregations; collection will acquire those locks
# iteratively to keep locking as fine-grained as possible. One side
# effect is that end times can be slightly skewed among the metric
# streams produced by the SDK, but we still align the output timestamps
# for a single instrument.
with self._lock:
for (
view_instrument_matches
) in self._view_instrument_match.values():
for view_instrument_match in view_instrument_matches:
metrics.extend(view_instrument_match.collect(temporality))
metrics.extend(
view_instrument_match.collect(
instrument_type_temporality
)
)

return metrics
15 changes: 15 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,18 @@
provide the entry point for loading the log emitter provider. If not specified, SDK
LogEmitterProvider is used.
"""

_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = (
"OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE"
)
"""
.. envvar:: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
The :envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment
variable allows users to set the default aggregation temporality policy to use
on the basis of instrument kind. The valid (case-insensitive) values are:
``CUMULATIVE``: Choose ``CUMULATIVE`` aggregation temporality for all instrument kinds.
``DELTA``: Choose ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``.
Choose ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``.
"""
Loading

0 comments on commit b0777a3

Please sign in to comment.