From ba3e3207edd217366710e10a769ba7ad4c4fa655 Mon Sep 17 00:00:00 2001 From: Chengzhong Wu Date: Wed, 25 May 2022 11:13:05 +0800 Subject: [PATCH] feat(metrics): multi-instrument async callback support (#2966) Co-authored-by: Daniel Dyla --- experimental/CHANGELOG.md | 5 + .../src/NoopMeter.ts | 77 ++++--- .../src/types/Meter.ts | 62 ++++-- .../src/types/Metric.ts | 32 ++- .../src/types/ObservableResult.ts | 20 +- .../noop-implementations/noop-meter.test.ts | 98 ++++++++- .../test/metricsHelper.ts | 10 +- .../test/metricsHelper.ts | 27 ++- .../test/metricsHelper.ts | 9 +- .../test/PrometheusExporter.test.ts | 38 ++-- .../opentelemetry-sdk-metrics-base/README.md | 51 +++++ .../src/Instruments.ts | 46 ++++- .../src/Measurement.ts | 27 --- .../src/Meter.ts | 59 ++++-- .../src/ObservableResult.ts | 50 ++++- .../src/state/MeterSharedState.ts | 26 ++- .../src/state/ObservableRegistry.ts | 141 ++++++++++--- .../src/state/TemporalMetricProcessor.ts | 13 +- .../src/utils.ts | 12 ++ .../test/Instruments.test.ts | 18 +- .../test/Meter.test.ts | 78 ++++++- .../test/ObservableResult.test.ts | 81 +++++++- .../test/state/AsyncMetricStorage.test.ts | 17 +- .../test/state/MeterSharedState.test.ts | 15 +- .../test/state/MetricCollector.test.ts | 192 +++++++++++++++++- .../state/MultiWritableMetricStorage.test.ts | 3 +- .../test/state/ObservableRegistry.test.ts | 110 ++++++++++ .../test/util.ts | 29 ++- 28 files changed, 1099 insertions(+), 247 deletions(-) delete mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/state/ObservableRegistry.test.ts diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 1aaa61599c..027a58898f 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -8,6 +8,11 @@ All notable changes to experimental packages in this project will be documented * feat(metrics): metric readers and exporters now select aggregation temporality based on instrument type #2902 @seemk * refactor(metrics-sdk): rename InstrumentationLibrary -> InstrumentationScope #2959 @pichlermarc +* feat(metrics): multi-instrument async callback support #2966 @legendecas + * changes on `meter.createObservableCounter`, `meter.createObservableGauge`, `meter.createObservableUpDownCounter` + * removed the second parameter `callback` + * returns an `Observable` object on which callbacks can be registered or unregistered. + * added `meter.addBatchObservableCallback` and `meter.removeBatchObservableCallback`. ### :rocket: (Enhancement) diff --git a/experimental/packages/opentelemetry-api-metrics/src/NoopMeter.ts b/experimental/packages/opentelemetry-api-metrics/src/NoopMeter.ts index 80d7b6df77..317431096f 100644 --- a/experimental/packages/opentelemetry-api-metrics/src/NoopMeter.ts +++ b/experimental/packages/opentelemetry-api-metrics/src/NoopMeter.ts @@ -16,12 +16,17 @@ import { Meter } from './types/Meter'; import { - MetricOptions, - MetricAttributes, + BatchObservableCallback, Counter, Histogram, - UpDownCounter, + MetricOptions, ObservableCallback, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, + MetricAttributes, + Observable, } from './types/Metric'; /** @@ -32,67 +37,65 @@ export class NoopMeter implements Meter { constructor() {} /** - * Returns a constant noop histogram. - * @param name the name of the metric. - * @param [options] the metric options. + * @see {@link Meter.createHistogram} */ createHistogram(_name: string, _options?: MetricOptions): Histogram { return NOOP_HISTOGRAM_METRIC; } /** - * Returns a constant noop counter. - * @param name the name of the metric. - * @param [options] the metric options. + * @see {@link Meter.createCounter} */ createCounter(_name: string, _options?: MetricOptions): Counter { return NOOP_COUNTER_METRIC; } /** - * Returns a constant noop UpDownCounter. - * @param name the name of the metric. - * @param [options] the metric options. + * @see {@link Meter.createUpDownCounter} */ createUpDownCounter(_name: string, _options?: MetricOptions): UpDownCounter { return NOOP_UP_DOWN_COUNTER_METRIC; } /** - * Returns a constant noop observable gauge. - * @param name the name of the metric. - * @param callback the observable gauge callback - * @param [options] the metric options. + * @see {@link Meter.createObservableGauge} */ createObservableGauge( _name: string, - _callback: ObservableCallback, _options?: MetricOptions, - ): void {} + ): ObservableGauge { + return NOOP_OBSERVABLE_GAUGE_METRIC; + } /** - * Returns a constant noop observable counter. - * @param name the name of the metric. - * @param callback the observable counter callback - * @param [options] the metric options. + * @see {@link Meter.createObservableCounter} */ createObservableCounter( _name: string, - _callback: ObservableCallback, _options?: MetricOptions, - ): void {} + ): ObservableCounter { + return NOOP_OBSERVABLE_COUNTER_METRIC; + } /** - * Returns a constant noop up down observable counter. - * @param name the name of the metric. - * @param callback the up down observable counter callback - * @param [options] the metric options. + * @see {@link Meter.createObservableUpDownCounter} */ createObservableUpDownCounter( _name: string, - _callback: ObservableCallback, _options?: MetricOptions, - ): void {} + ): ObservableUpDownCounter { + return NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC; + } + + /** + * @see {@link Meter.addBatchObservableCallback} + */ + addBatchObservableCallback(_callback: BatchObservableCallback, _observables: Observable[]): void {} + + /** + * @see {@link Meter.removeBatchObservableCallback} + */ + removeBatchObservableCallback(_callback: BatchObservableCallback): void {} } export class NoopMetric {} @@ -109,9 +112,23 @@ export class NoopHistogramMetric extends NoopMetric implements Histogram { record(_value: number, _attributes: MetricAttributes): void {} } +export class NoopObservableMetric { + addCallback(_callback: ObservableCallback) {} + removeCallback(_callback: ObservableCallback) {} +} + +export class NoopObservableCounterMetric extends NoopObservableMetric implements ObservableCounter {} +export class NoopObservableGaugeMetric extends NoopObservableMetric implements ObservableGauge {} +export class NoopObservableUpDownCounterMetric extends NoopObservableMetric implements ObservableUpDownCounter {} + export const NOOP_METER = new NoopMeter(); // Synchronous instruments export const NOOP_COUNTER_METRIC = new NoopCounterMetric(); export const NOOP_HISTOGRAM_METRIC = new NoopHistogramMetric(); export const NOOP_UP_DOWN_COUNTER_METRIC = new NoopUpDownCounterMetric(); + +// Asynchronous instruments +export const NOOP_OBSERVABLE_COUNTER_METRIC = new NoopObservableCounterMetric(); +export const NOOP_OBSERVABLE_GAUGE_METRIC = new NoopObservableGaugeMetric(); +export const NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC = new NoopObservableUpDownCounterMetric(); diff --git a/experimental/packages/opentelemetry-api-metrics/src/types/Meter.ts b/experimental/packages/opentelemetry-api-metrics/src/types/Meter.ts index 5d3d16d668..3931f01622 100644 --- a/experimental/packages/opentelemetry-api-metrics/src/types/Meter.ts +++ b/experimental/packages/opentelemetry-api-metrics/src/types/Meter.ts @@ -14,14 +14,15 @@ * limitations under the License. */ -import { CounterOptions, HistogramOptions, UpDownCounterOptions } from '..'; import { + BatchObservableCallback, Counter, Histogram, - ObservableCallback, - ObservableCounterOptions, - ObservableGaugeOptions, - ObservableUpDownCounterOptions, + MetricOptions, + Observable, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, UpDownCounter, } from './Metric'; @@ -48,7 +49,7 @@ export interface Meter { * @param name the name of the metric. * @param [options] the metric options. */ - createHistogram(name: string, options?: HistogramOptions): Histogram; + createHistogram(name: string, options?: MetricOptions): Histogram; /** * Creates a new `Counter` metric. Generally, this kind of metric when the @@ -57,7 +58,7 @@ export interface Meter { * @param name the name of the metric. * @param [options] the metric options. */ - createCounter(name: string, options?: CounterOptions): Counter; + createCounter(name: string, options?: MetricOptions): Counter; /** * Creates a new `UpDownCounter` metric. UpDownCounter is a synchronous @@ -76,7 +77,7 @@ export interface Meter { * @param name the name of the metric. * @param [options] the metric options. */ - createUpDownCounter(name: string, options?: UpDownCounterOptions): UpDownCounter; + createUpDownCounter(name: string, options?: MetricOptions): UpDownCounter; /** * Creates a new `ObservableGauge` metric. @@ -84,14 +85,12 @@ export interface Meter { * The callback SHOULD be safe to be invoked concurrently. * * @param name the name of the metric. - * @param callback the observable callback * @param [options] the metric options. */ createObservableGauge( name: string, - callback: ObservableCallback, - options?: ObservableGaugeOptions - ): void; + options?: MetricOptions + ): ObservableGauge; /** * Creates a new `ObservableCounter` metric. @@ -99,14 +98,12 @@ export interface Meter { * The callback SHOULD be safe to be invoked concurrently. * * @param name the name of the metric. - * @param callback the observable callback * @param [options] the metric options. */ createObservableCounter( name: string, - callback: ObservableCallback, - options?: ObservableCounterOptions - ): void; + options?: MetricOptions + ): ObservableCounter; /** * Creates a new `ObservableUpDownCounter` metric. @@ -114,12 +111,37 @@ export interface Meter { * The callback SHOULD be safe to be invoked concurrently. * * @param name the name of the metric. - * @param callback the observable callback * @param [options] the metric options. */ createObservableUpDownCounter( name: string, - callback: ObservableCallback, - options?: ObservableUpDownCounterOptions - ): void; + options?: MetricOptions + ): ObservableUpDownCounter; + + /** + * Sets up a function that will be called whenever a metric collection is + * initiated. + * + * If the function is already in the list of callbacks for this Observable, + * the function is not added a second time. + * + * Only the associated observables can be observed in the callback. + * Measurements of observables that are not associated observed in the + * callback are dropped. + * + * @param callback the batch observable callback + * @param observables the observables associated with this batch observable callback + */ + addBatchObservableCallback(callback: BatchObservableCallback, observables: Observable[]): void; + + /** + * Removes a callback previously registered with {@link Meter.addBatchObservableCallback}. + * + * The callback to be removed is identified using a combination of the callback itself, + * and the set of the observables associated with it. + * + * @param callback the batch observable callback + * @param observables the observables associated with this batch observable callback + */ + removeBatchObservableCallback(callback: BatchObservableCallback, observables: Observable[]): void; } diff --git a/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts b/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts index f066c6873c..51886d2b23 100644 --- a/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts +++ b/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts @@ -15,7 +15,7 @@ */ import { Context } from '@opentelemetry/api'; -import { ObservableResult } from './ObservableResult'; +import { BatchObservableResult, ObservableResult } from './ObservableResult'; /** * Options needed for metric creation @@ -40,13 +40,6 @@ export interface MetricOptions { valueType?: ValueType; } -export type CounterOptions = MetricOptions; -export type UpDownCounterOptions = MetricOptions; -export type ObservableGaugeOptions = MetricOptions; -export type ObservableCounterOptions = MetricOptions; -export type ObservableUpDownCounterOptions = MetricOptions; -export type HistogramOptions = MetricOptions; - /** The Type of value. It describes how the data is reported. */ export enum ValueType { INT, @@ -98,3 +91,26 @@ export type MetricAttributes = { [key: string]: string }; * The observable callback for Observable instruments. */ export type ObservableCallback = (observableResult: ObservableResult) => void | Promise; + +/** + * The observable callback for a batch of Observable instruments. + */ +export type BatchObservableCallback = (observableResult: BatchObservableResult) => void | Promise; + +export interface Observable { + /** + * Sets up a function that will be called whenever a metric collection is initiated. + * + * If the function is already in the list of callbacks for this Observable, the function is not added a second time. + */ + addCallback(callback: ObservableCallback): void; + + /** + * Removes a callback previously registered with {@link Observable.addCallback}. + */ + removeCallback(callback: ObservableCallback): void; +} + +export type ObservableCounter = Observable; +export type ObservableUpDownCounter = Observable; +export type ObservableGauge = Observable; diff --git a/experimental/packages/opentelemetry-api-metrics/src/types/ObservableResult.ts b/experimental/packages/opentelemetry-api-metrics/src/types/ObservableResult.ts index 29800212f0..a38755982d 100644 --- a/experimental/packages/opentelemetry-api-metrics/src/types/ObservableResult.ts +++ b/experimental/packages/opentelemetry-api-metrics/src/types/ObservableResult.ts @@ -14,10 +14,10 @@ * limitations under the License. */ -import { MetricAttributes } from './Metric'; +import { MetricAttributes, Observable } from './Metric'; /** - * Interface that is being used in callback function for Observable Metric + * Interface that is being used in callback function for Observable Metric. */ export interface ObservableResult { /** @@ -30,3 +30,19 @@ export interface ObservableResult { */ observe(value: number, attributes?: MetricAttributes): void; } + +/** + * Interface that is being used in batch observable callback function. + */ +export interface BatchObservableResult { + /** + * Observe a measurement of the value associated with the given attributes. + * + * @param metric The observable metric to be observed. + * @param value The value to be observed. + * @param attributes The attributes associated with the value. If more than + * one values associated with the same attributes values, SDK may pick the + * last one or simply drop the entire observable result. + */ + observe(metric: Observable, value: number, attributes?: MetricAttributes): void; +} diff --git a/experimental/packages/opentelemetry-api-metrics/test/noop-implementations/noop-meter.test.ts b/experimental/packages/opentelemetry-api-metrics/test/noop-implementations/noop-meter.test.ts index 4d5fced1f9..82f0e8b6be 100644 --- a/experimental/packages/opentelemetry-api-metrics/test/noop-implementations/noop-meter.test.ts +++ b/experimental/packages/opentelemetry-api-metrics/test/noop-implementations/noop-meter.test.ts @@ -16,16 +16,31 @@ import * as assert from 'assert'; import { + NoopMeter, NoopMeterProvider, NOOP_COUNTER_METRIC, NOOP_HISTOGRAM_METRIC, + NOOP_OBSERVABLE_COUNTER_METRIC, + NOOP_OBSERVABLE_GAUGE_METRIC, + NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC, + NOOP_UP_DOWN_COUNTER_METRIC, } from '../../src'; +const attributes = {}; +const options = { + component: 'tests', + description: 'the testing package', +}; + describe('NoopMeter', () => { - it('should not crash', () => { + it('constructor should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); + assert(meter instanceof NoopMeter); + }); + + it('counter should not crash', () => { const meter = new NoopMeterProvider().getMeter('test-noop'); const counter = meter.createCounter('some-name'); - const attributes = {}; // ensure NoopMetric does not crash. counter.add(1, attributes); @@ -33,23 +48,88 @@ describe('NoopMeter', () => { // ensure the correct noop const is returned assert.strictEqual(counter, NOOP_COUNTER_METRIC); + const counterWithOptions = meter.createCounter('some-name', options); + assert.strictEqual(counterWithOptions, NOOP_COUNTER_METRIC); + }); + + it('histogram should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); const histogram = meter.createHistogram('some-name'); histogram.record(1, attributes); // ensure the correct noop const is returned assert.strictEqual(histogram, NOOP_HISTOGRAM_METRIC); - const options = { - component: 'tests', - description: 'the testing package', - }; - const histogramWithOptions = meter.createHistogram( 'some-name', options ); assert.strictEqual(histogramWithOptions, NOOP_HISTOGRAM_METRIC); - const counterWithOptions = meter.createCounter('some-name', options); - assert.strictEqual(counterWithOptions, NOOP_COUNTER_METRIC); + }); + + it('up down counter should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); + const upDownCounter = meter.createUpDownCounter('some-name'); + upDownCounter.add(1, attributes); + + // ensure the correct noop const is returned + assert.strictEqual(upDownCounter, NOOP_UP_DOWN_COUNTER_METRIC); + + const upDownCounterWithOptions = meter.createUpDownCounter( + 'some-name', + options + ); + assert.strictEqual(upDownCounterWithOptions, NOOP_UP_DOWN_COUNTER_METRIC); + }); + + it('observable counter should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); + const observableCounter = meter.createObservableCounter('some-name'); + observableCounter.addCallback(() => {}); + + // ensure the correct noop const is returned + assert.strictEqual(observableCounter, NOOP_OBSERVABLE_COUNTER_METRIC); + + const observableCounterWithOptions = meter.createObservableCounter( + 'some-name', + options + ); + assert.strictEqual(observableCounterWithOptions, NOOP_OBSERVABLE_COUNTER_METRIC); + }); + + it('observable gauge should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); + const observableGauge = meter.createObservableGauge('some-name'); + observableGauge.addCallback(() => {}); + + // ensure the correct noop const is returned + assert.strictEqual(observableGauge, NOOP_OBSERVABLE_GAUGE_METRIC); + + const observableGaugeWithOptions = meter.createObservableGauge( + 'some-name', + options + ); + assert.strictEqual(observableGaugeWithOptions, NOOP_OBSERVABLE_GAUGE_METRIC); + }); + + it('observable up down counter should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); + const observableUpDownCounter = meter.createObservableUpDownCounter('some-name'); + observableUpDownCounter.addCallback(() => {}); + + // ensure the correct noop const is returned + assert.strictEqual(observableUpDownCounter, NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC); + + const observableUpDownCounterWithOptions = meter.createObservableUpDownCounter( + 'some-name', + options + ); + assert.strictEqual(observableUpDownCounterWithOptions, NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC); + }); + + it('batch callback should not crash', () => { + const meter = new NoopMeterProvider().getMeter('test-noop'); + meter.addBatchObservableCallback(() => {}, []); + meter.removeBatchObservableCallback(() => {}, []); }); }); diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/metricsHelper.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/metricsHelper.ts index 54bd6ee553..997cd0ed86 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/metricsHelper.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/metricsHelper.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { Counter, Histogram, ObservableResult, ValueType } from '@opentelemetry/api-metrics'; +import { Counter, Histogram, ObservableGauge, ObservableResult, ValueType } from '@opentelemetry/api-metrics'; import { Resource } from '@opentelemetry/resources'; import * as assert from 'assert'; import * as grpc from '@grpc/grpc-js'; @@ -81,16 +81,18 @@ export function mockCounter(): Counter { export function mockObservableGauge( callback: (observableResult: ObservableResult) => void -): void { +): ObservableGauge { const name = 'double-observable-gauge'; - return meter.createObservableGauge( + const observableGauge = meter.createObservableGauge( name, - callback, { description: 'sample observable gauge description', valueType: ValueType.DOUBLE, }, ); + observableGauge.addCallback(callback); + + return observableGauge; } export function mockHistogram(): Histogram { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/metricsHelper.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/metricsHelper.ts index 9b801c51a4..31e2bcb96d 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/metricsHelper.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/metricsHelper.ts @@ -19,6 +19,9 @@ import { ObservableResult, Histogram, ValueType, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, } from '@opentelemetry/api-metrics'; import { Resource } from '@opentelemetry/resources'; import * as assert from 'assert'; @@ -99,15 +102,17 @@ export function mockCounter(): Counter { export function mockObservableGauge( callback: (observableResult: ObservableResult) => void, name = 'double-observable-gauge' -): void { - return meter.createObservableGauge( +): ObservableGauge { + const observableGauge = meter.createObservableGauge( name, - callback, { description: 'sample observable gauge description', valueType: ValueType.DOUBLE, } ); + observableGauge.addCallback(callback); + + return observableGauge; } export function mockDoubleCounter(): Counter { @@ -123,29 +128,33 @@ export function mockDoubleCounter(): Counter { export function mockObservableCounter( callback: (observableResult: ObservableResult) => void, name = 'double-observable-counter' -): void { - meter.createObservableCounter( +): ObservableCounter { + const observableCounter = meter.createObservableCounter( name, - callback, { description: 'sample observable counter description', valueType: ValueType.DOUBLE, } ); + observableCounter.addCallback(callback); + + return observableCounter; } export function mockObservableUpDownCounter( callback: (observableResult: ObservableResult) => void, name = 'double-up-down-observable-counter' -): void { - meter.createObservableUpDownCounter( +): ObservableUpDownCounter { + const observableUpDownCounter = meter.createObservableUpDownCounter( name, - callback, { description: 'sample observable up down counter description', valueType: ValueType.DOUBLE, }, ); + observableUpDownCounter.addCallback(callback); + + return observableUpDownCounter; } export function mockHistogram(): Histogram { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts index 80d38bd164..0bacab995c 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts @@ -19,6 +19,7 @@ import { ObservableResult, Histogram, ValueType, + ObservableGauge, } from '@opentelemetry/api-metrics'; import { Resource } from '@opentelemetry/resources'; import * as assert from 'assert'; @@ -85,16 +86,18 @@ export function mockCounter(): Counter { export function mockObservableGauge( callback: (observableResult: ObservableResult) => void -): void { +): ObservableGauge { const name = 'double-observable-gauge'; - return meter.createObservableGauge( + const observableGauge = meter.createObservableGauge( name, - callback, { description: 'sample observable gauge description', valueType: ValueType.DOUBLE, }, ); + observableGauge.addCallback(callback); + + return observableGauge; } export function mockHistogram(): Histogram { diff --git a/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts b/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts index 0a0747b1ef..ba7d5f752a 100644 --- a/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts @@ -268,18 +268,18 @@ describe('PrometheusExporter', () => { return 0.999; } - meter.createObservableGauge( + const observableGauge = meter.createObservableGauge( 'metric_observable_gauge', - (observableResult: ObservableResult) => { - observableResult.observe(getCpuUsage(), { - pid: String(123), - core: '1', - }); - }, { description: 'a test description', }, ); + observableGauge.addCallback((observableResult: ObservableResult) => { + observableResult.observe(getCpuUsage(), { + pid: String(123), + core: '1', + }); + }); const body = await request('http://localhost:9464/metrics'); const lines = body.split('\n'); @@ -394,17 +394,17 @@ describe('PrometheusExporter', () => { return 20; } - meter.createObservableCounter( + const observableCounter = meter.createObservableCounter( 'metric_observable_counter', - (observableResult: ObservableResult) => { - observableResult.observe(getValue(), { - key1: 'attributeValue1', - }); - }, { description: 'a test description', }, ); + observableCounter.addCallback((observableResult: ObservableResult) => { + observableResult.observe(getValue(), { + key1: 'attributeValue1', + }); + }); const body = await request('http://localhost:9464/metrics'); const lines = body.split('\n'); @@ -422,17 +422,17 @@ describe('PrometheusExporter', () => { return 20; } - meter.createObservableUpDownCounter( + const observableUpDownCounter = meter.createObservableUpDownCounter( 'metric_observable_up_down_counter', - (observableResult: ObservableResult) => { - observableResult.observe(getValue(), { - key1: 'attributeValue1', - }); - }, { description: 'a test description', }, ); + observableUpDownCounter.addCallback((observableResult: ObservableResult) => { + observableResult.observe(getValue(), { + key1: 'attributeValue1', + }); + }); const body = await request('http://localhost:9464/metrics'); const lines = body.split('\n'); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/README.md b/experimental/packages/opentelemetry-sdk-metrics-base/README.md index 8418b209c4..dcd6f586f2 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/README.md +++ b/experimental/packages/opentelemetry-sdk-metrics-base/README.md @@ -21,6 +21,57 @@ Please see the [version `0.27.0` README](https://github.com/open-telemetry/opent TODO: Add usage information for updated SDK +## Installation of the Latest experimental version + +```bash +npm install --save @opentelemetry/sdk-metrics-base +``` + +## Usage of the Latest experimental version + +The basic setup of the SDK can be seen as followings: + +```js +const opentelemetry = require('@opentelemetry/api-metrics'); +const { MeterProvider } = require('@opentelemetry/sdk-metrics-base'); + +// To create an instrument, you first need to initialize the Meter provider. +// NOTE: The default OpenTelemetry meter provider does not record any metric instruments. +// Registering a working meter provider allows the API methods to record instruments. +opentelemetry.setGlobalMeterProvider(new MeterProvider()); + +// To record a metric event, we used the global singleton meter to create an instrument. +const counter = opentelemetry.getMeter('default').createCounter('foo'); + +// record a metric event. +counter.add(1, { attributeKey: 'attribute-value' }); +``` + +In conditions, we may need to setup an async instrument to observe costly events: + +```js +// Creating an async instrument, similar to synchronous instruments +const observableCounter = opentelemetry.getMeter('default') + .createObservableCounter('observable-counter'); + +// Register a single-instrument callback to the async instrument. +observableCounter.addCallback(async (observableResult) => { + // ... do async stuff + observableResult.observe(1, { attributeKey: 'attribute-value' }); +}); + +// Register a multi-instrument callback and associate it with a set of async instruments. +opentelemetry.getMeter('default') + .addBatchObservableCallback(batchObservableCallback, [ observableCounter ]); +async function batchObservableCallback(batchObservableResult) { + // ... do async stuff + batchObservableResult.observe(observableCounter, 1, { attributeKey: 'attribute-value' }); + + // This is been dropped since the observable is not associated with the callback at registration. + batchObservableResult.observe(otherObservable, 2); +} +``` + ## Useful links - For more information on OpenTelemetry, visit: diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts index 00c6973ed6..44467cc7d8 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts @@ -16,15 +16,13 @@ import * as api from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics'; +import { ObservableCallback } from '@opentelemetry/api-metrics'; import { InstrumentDescriptor } from './InstrumentDescriptor'; -import { WritableMetricStorage } from './state/WritableMetricStorage'; +import { ObservableRegistry } from './state/ObservableRegistry'; +import { AsyncWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage'; export class SyncInstrument { - constructor(private _writableMetricStorage: WritableMetricStorage, private _descriptor: InstrumentDescriptor) {} - - getName(): string { - return this._descriptor.name; - } + constructor(private _writableMetricStorage: WritableMetricStorage, protected _descriptor: InstrumentDescriptor) {} protected _record(value: number, attributes: metrics.MetricAttributes = {}, context: api.Context = api.context.active()) { if (this._descriptor.valueType === metrics.ValueType.INT && !Number.isInteger(value)) { @@ -58,7 +56,7 @@ export class CounterInstrument extends SyncInstrument implements metrics.Counter */ add(value: number, attributes?: metrics.MetricAttributes, ctx?: api.Context): void { if (value < 0) { - api.diag.warn(`negative value provided to counter ${this.getName()}: ${value}`); + api.diag.warn(`negative value provided to counter ${this._descriptor.name}: ${value}`); return; } @@ -77,3 +75,37 @@ export class HistogramInstrument extends SyncInstrument implements metrics.Histo this._record(value, attributes, ctx); } } + +export class ObservableInstrument implements metrics.Observable { + /** @internal */ + _metricStorages: AsyncWritableMetricStorage[]; + /** @internal */ + _descriptor: InstrumentDescriptor; + + constructor(descriptor: InstrumentDescriptor, metricStorages: AsyncWritableMetricStorage[], private _observableRegistry: ObservableRegistry) { + this._descriptor = descriptor; + this._metricStorages = metricStorages; + } + + /** + * @see {metrics.Observable.addCallback} + */ + addCallback(callback: ObservableCallback) { + this._observableRegistry.addCallback(callback, this); + } + + /** + * @see {metrics.Observable.removeCallback} + */ + removeCallback(callback: ObservableCallback) { + this._observableRegistry.removeCallback(callback, this); + } +} + +export class ObservableCounterInstrument extends ObservableInstrument implements metrics.ObservableCounter {} +export class ObservableGaugeInstrument extends ObservableInstrument implements metrics.ObservableGauge {} +export class ObservableUpDownCounterInstrument extends ObservableInstrument implements metrics.ObservableUpDownCounter {} + +export function isObservableInstrument(it: unknown): it is ObservableInstrument { + return it instanceof ObservableInstrument; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts deleted file mode 100644 index 02a7f12418..0000000000 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as api from '@opentelemetry/api'; -import { MetricAttributes } from '@opentelemetry/api-metrics'; - -// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#measurement - -export type Measurement = { - value: number; - // TODO use common attributes - attributes: MetricAttributes - context?: api.Context; -}; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts index 7009ad3ae4..d8a7f00a21 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts @@ -16,7 +16,14 @@ import * as metrics from '@opentelemetry/api-metrics'; import { createInstrumentDescriptor, InstrumentType } from './InstrumentDescriptor'; -import { CounterInstrument, HistogramInstrument, UpDownCounterInstrument } from './Instruments'; +import { + CounterInstrument, + HistogramInstrument, + ObservableCounterInstrument, + ObservableGaugeInstrument, + ObservableUpDownCounterInstrument, + UpDownCounterInstrument, +} from './Instruments'; import { MeterSharedState } from './state/MeterSharedState'; /** @@ -28,7 +35,7 @@ export class Meter implements metrics.Meter { /** * Create a {@link metrics.Histogram} instrument. */ - createHistogram(name: string, options?: metrics.HistogramOptions): metrics.Histogram { + createHistogram(name: string, options?: metrics.MetricOptions): metrics.Histogram { const descriptor = createInstrumentDescriptor(name, InstrumentType.HISTOGRAM, options); const storage = this._meterSharedState.registerMetricStorage(descriptor); return new HistogramInstrument(storage, descriptor); @@ -37,7 +44,7 @@ export class Meter implements metrics.Meter { /** * Create a {@link metrics.Counter} instrument. */ - createCounter(name: string, options?: metrics.CounterOptions): metrics.Counter { + createCounter(name: string, options?: metrics.MetricOptions): metrics.Counter { const descriptor = createInstrumentDescriptor(name, InstrumentType.COUNTER, options); const storage = this._meterSharedState.registerMetricStorage(descriptor); return new CounterInstrument(storage, descriptor); @@ -46,45 +53,59 @@ export class Meter implements metrics.Meter { /** * Create a {@link metrics.UpDownCounter} instrument. */ - createUpDownCounter(name: string, options?: metrics.UpDownCounterOptions): metrics.UpDownCounter { + createUpDownCounter(name: string, options?: metrics.MetricOptions): metrics.UpDownCounter { const descriptor = createInstrumentDescriptor(name, InstrumentType.UP_DOWN_COUNTER, options); const storage = this._meterSharedState.registerMetricStorage(descriptor); return new UpDownCounterInstrument(storage, descriptor); } /** - * Create a ObservableGauge instrument. + * Create a {@link metrics.ObservableGauge} instrument. */ createObservableGauge( name: string, - callback: metrics.ObservableCallback, - options?: metrics.ObservableGaugeOptions, - ): void { + options?: metrics.MetricOptions, + ): metrics.ObservableGauge { const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_GAUGE, options); - this._meterSharedState.registerAsyncMetricStorage(descriptor, callback); + const storages = this._meterSharedState.registerAsyncMetricStorage(descriptor); + return new ObservableGaugeInstrument(descriptor, storages, this._meterSharedState.observableRegistry); } /** - * Create a ObservableCounter instrument. + * Create a {@link metrics.ObservableCounter} instrument. */ createObservableCounter( name: string, - callback: metrics.ObservableCallback, - options?: metrics.ObservableCounterOptions, - ): void { + options?: metrics.MetricOptions, + ): metrics.ObservableCounter { const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_COUNTER, options); - this._meterSharedState.registerAsyncMetricStorage(descriptor, callback); + const storages = this._meterSharedState.registerAsyncMetricStorage(descriptor); + return new ObservableCounterInstrument(descriptor, storages, this._meterSharedState.observableRegistry); } /** - * Create a ObservableUpDownCounter instrument. + * Create a {@link metrics.ObservableUpDownCounter} instrument. */ createObservableUpDownCounter( name: string, - callback: metrics.ObservableCallback, - options?: metrics.ObservableUpDownCounterOptions, - ): void { + options?: metrics.MetricOptions, + ): metrics.ObservableUpDownCounter { const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, options); - this._meterSharedState.registerAsyncMetricStorage(descriptor, callback); + const storages = this._meterSharedState.registerAsyncMetricStorage(descriptor); + return new ObservableUpDownCounterInstrument(descriptor, storages, this._meterSharedState.observableRegistry); + } + + /** + * @see {@link metrics.Meter.addBatchObservableCallback} + */ + addBatchObservableCallback(callback: metrics.BatchObservableCallback, observables: metrics.Observable[]) { + this._meterSharedState.observableRegistry.addBatchCallback(callback, observables); + } + + /** + * @see {@link metrics.Meter.removeBatchObservableCallback} + */ + removeBatchObservableCallback(callback: metrics.BatchObservableCallback, observables: metrics.Observable[]) { + this._meterSharedState.observableRegistry.removeBatchCallback(callback, observables); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/ObservableResult.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/ObservableResult.ts index 899f03b5f7..19ba32846c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/ObservableResult.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/ObservableResult.ts @@ -14,22 +14,64 @@ * limitations under the License. */ +import * as api from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics'; import { AttributeHashMap } from './state/HashMap'; +import { isObservableInstrument, ObservableInstrument } from './Instruments'; +import { InstrumentDescriptor } from '.'; /** - * The class implements {@link metrics.observableResult} interface. + * The class implements {@link metrics.ObservableResult} interface. */ -export class ObservableResult implements metrics.ObservableResult { +export class ObservableResultImpl implements metrics.ObservableResult { /** * @internal */ - buffer = new AttributeHashMap(); + _buffer = new AttributeHashMap(); + + constructor(private _descriptor: InstrumentDescriptor) {} /** * Observe a measurement of the value associated with the given attributes. */ observe(value: number, attributes: metrics.MetricAttributes = {}): void { - this.buffer.set(attributes, value); + if (this._descriptor.valueType === metrics.ValueType.INT && !Number.isInteger(value)) { + api.diag.warn( + `INT value type cannot accept a floating-point value for ${this._descriptor.name}, ignoring the fractional digits.` + ); + value = Math.trunc(value); + } + this._buffer.set(attributes, value); + } +} + +/** + * The class implements {@link metrics.BatchObservableCallback} interface. + */ +export class BatchObservableResultImpl implements metrics.BatchObservableResult { + /** + * @internal + */ + _buffer: Map> = new Map(); + + /** + * Observe a measurement of the value associated with the given attributes. + */ + observe(metric: metrics.Observable, value: number, attributes: metrics.MetricAttributes = {}): void { + if (!isObservableInstrument(metric)) { + return; + } + let map = this._buffer.get(metric); + if (map == null) { + map = new AttributeHashMap(); + this._buffer.set(metric, map); + } + if (metric._descriptor.valueType === metrics.ValueType.INT && !Number.isInteger(value)) { + api.diag.warn( + `INT value type cannot accept a floating-point value for ${metric._descriptor.name}, ignoring the fractional digits.` + ); + value = Math.trunc(value); + } + map.set(attributes, value); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts index 725c4427b5..b8feefaddd 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts @@ -15,7 +15,6 @@ */ import { HrTime } from '@opentelemetry/api'; -import * as metrics from '@opentelemetry/api-metrics'; import { InstrumentationScope } from '@opentelemetry/core'; import { MetricCollectOptions } from '../export/MetricProducer'; import { ScopeMetrics } from '../export/MetricData'; @@ -35,7 +34,7 @@ import { SyncMetricStorage } from './SyncMetricStorage'; */ export class MeterSharedState { private _metricStorageRegistry = new MetricStorageRegistry(); - private _observableRegistry = new ObservableRegistry(); + observableRegistry = new ObservableRegistry(); meter: Meter; constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationScope: InstrumentationScope) { @@ -58,18 +57,17 @@ export class MeterSharedState { return new MultiMetricStorage(storages); } - registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) { + registerAsyncMetricStorage(descriptor: InstrumentDescriptor) { const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationScope); - views.forEach(view => { - const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor); - const aggregator = view.aggregation.createAggregator(viewDescriptor); - const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor); - const storage = this._metricStorageRegistry.register(viewStorage); - if (storage == null) { - return; - } - this._observableRegistry.addCallback(callback, storage); - }); + const storages = views + .map(view => { + const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor); + const aggregator = view.aggregation.createAggregator(viewDescriptor); + const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor); + return this._metricStorageRegistry.register(viewStorage); + }) + .filter(isNotNullish); + return storages; } /** @@ -82,7 +80,7 @@ export class MeterSharedState { * 1. Call all observable callbacks first. * 2. Collect metric result for the collector. */ - const errors = await this._observableRegistry.observe(options?.timeoutMillis); + const errors = await this.observableRegistry.observe(options?.timeoutMillis); const metricDataList = Array.from(this._metricStorageRegistry.getStorages()) .map(metricStorage => { return metricStorage.collect( diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts index b467c6fcd5..005828283f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts @@ -14,48 +14,137 @@ * limitations under the License. */ -import { ObservableCallback } from '@opentelemetry/api-metrics'; -import { ObservableResult } from '../ObservableResult'; -import { callWithTimeout, PromiseAllSettled, isPromiseAllSettledRejectionResult } from '../utils'; -import { AsyncWritableMetricStorage } from './WritableMetricStorage'; +import * as api from '@opentelemetry/api'; +import { BatchObservableCallback, Observable, ObservableCallback } from '@opentelemetry/api-metrics'; +import { isObservableInstrument, ObservableInstrument } from '../Instruments'; +import { BatchObservableResultImpl, ObservableResultImpl } from '../ObservableResult'; +import { callWithTimeout, PromiseAllSettled, isPromiseAllSettledRejectionResult, setEquals } from '../utils'; /** - * An internal state interface for ObservableCallbacks. - * - * An ObservableCallback can be bound to multiple AsyncMetricStorage at once - * for batch observations. And an AsyncMetricStorage may be bound to multiple - * callbacks too. + * Records for single instrument observable callback. + */ +interface ObservableCallbackRecord { + callback: ObservableCallback; + instrument: ObservableInstrument; +} + +/** + * Records for multiple instruments observable callback. + */ +interface BatchObservableCallbackRecord { + callback: BatchObservableCallback; + instruments: Set; +} + +/** + * An internal interface for managing ObservableCallbacks. * - * However an ObservableCallback must not be called multiple times during a - * single collection operation. + * Every registered callback associated with a set of instruments are be evaluated + * exactly once during collection prior to reading data for that instrument. */ export class ObservableRegistry { - private _callbacks: [ObservableCallback, AsyncWritableMetricStorage][] = []; + private _callbacks: ObservableCallbackRecord[] = []; + private _batchCallbacks: BatchObservableCallbackRecord[] = []; - addCallback(callback: ObservableCallback, metricStorage: AsyncWritableMetricStorage) { - this._callbacks.push([callback, metricStorage]); + addCallback(callback: ObservableCallback, instrument: ObservableInstrument) { + const idx = this._findCallback(callback, instrument); + if (idx >= 0) { + return; + } + this._callbacks.push({ callback, instrument }); + } + + removeCallback(callback: ObservableCallback, instrument: ObservableInstrument) { + const idx = this._findCallback(callback, instrument); + if (idx < 0) { + return; + } + this._callbacks.splice(idx, 1); + } + + addBatchCallback(callback: BatchObservableCallback, instruments: Observable[]) { + // Create a set of unique instruments. + const observableInstruments = new Set(instruments.filter(isObservableInstrument)); + if (observableInstruments.size === 0) { + api.diag.error('BatchObservableCallback is not associated with valid instruments', instruments); + return; + } + const idx = this._findBatchCallback(callback, observableInstruments); + if (idx >= 0) { + return; + } + this._batchCallbacks.push({ callback, instruments: observableInstruments }); + } + + removeBatchCallback(callback: BatchObservableCallback, instruments: Observable[]) { + // Create a set of unique instruments. + const observableInstruments = new Set(instruments.filter(isObservableInstrument)); + const idx = this._findBatchCallback(callback, observableInstruments); + if (idx < 0) { + return; + } + this._batchCallbacks.splice(idx, 1); } /** * @returns a promise of rejected reasons for invoking callbacks. */ async observe(timeoutMillis?: number): Promise { - // TODO: batch observables - // https://github.com/open-telemetry/opentelemetry-specification/pull/2363 - const results = await PromiseAllSettled(this._callbacks - .map(async ([observableCallback, metricStorage]) => { - const observableResult = new ObservableResult(); - let callPromise: Promise = Promise.resolve(observableCallback(observableResult)); + const callbackFutures = this._observeCallbacks(timeoutMillis); + const batchCallbackFutures = this._observeBatchCallbacks(timeoutMillis); + + const results = await PromiseAllSettled([...callbackFutures, ...batchCallbackFutures]); + + const rejections = results.filter(isPromiseAllSettledRejectionResult) + .map(it => it.reason); + return rejections; + } + + private _observeCallbacks(timeoutMillis?: number) { + return this._callbacks + .map(async ({ callback, instrument }) => { + const observableResult = new ObservableResultImpl(instrument._descriptor); + let callPromise: Promise = Promise.resolve(callback(observableResult)); if (timeoutMillis != null) { callPromise = callWithTimeout(callPromise, timeoutMillis); } await callPromise; - metricStorage.record(observableResult.buffer); - }) - ); + instrument._metricStorages.forEach(metricStorage => { + metricStorage.record(observableResult._buffer); + }); + }); + } - const rejections = results.filter(isPromiseAllSettledRejectionResult) - .map(it => it.reason); - return rejections; + private _observeBatchCallbacks(timeoutMillis?: number) { + return this._batchCallbacks + .map(async ({ callback, instruments }) => { + const observableResult = new BatchObservableResultImpl(); + let callPromise: Promise = Promise.resolve(callback(observableResult)); + if (timeoutMillis != null) { + callPromise = callWithTimeout(callPromise, timeoutMillis); + } + await callPromise; + instruments.forEach(instrument => { + const buffer = observableResult._buffer.get(instrument); + if (buffer == null) { + return; + } + instrument._metricStorages.forEach(metricStorage => { + metricStorage.record(buffer); + }); + }); + }); + } + + private _findCallback(callback: ObservableCallback, instrument: ObservableInstrument) { + return this._callbacks.findIndex(record => { + return record.callback === callback && record.instrument === instrument; + }); + } + + private _findBatchCallback(callback: BatchObservableCallback, instruments: Set) { + return this._batchCallbacks.findIndex(record => { + return record.callback === callback && setEquals(record.instruments, instruments); + }); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts index dfd3afe6a2..f0bd55fe3d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts @@ -92,12 +92,17 @@ export class TemporalMetricProcessor { // previous. We have the following four scenarios: // 1. Cumulative Aggregation (temporality) + Delta recording (sync instrument). // Here we merge with our last record to get a cumulative aggregation. - // 2. Cumulative Aggregation + Cumulative recording - do nothing - // 3. Delta Aggregation + Delta recording - do nothing. - // 4. Delta Aggregation + Cumulative recording (async instrument) - do nothing + // 2. Cumulative Aggregation + Cumulative recording (async instrument). + // Cumulative records are converted to delta recording with DeltaMetricProcessor. + // Here we merge with our last record to get a cumulative aggregation. + // 3. Delta Aggregation + Delta recording + // Do nothing here. + // 4. Delta Aggregation + Cumulative recording. + // Cumulative records are converted to delta recording with DeltaMetricProcessor. + // Do nothing here. if (aggregationTemporality === AggregationTemporality.CUMULATIVE) { // We need to make sure the current delta recording gets merged into the previous cumulative - // for the next cumulative measurement. + // for the next cumulative recording. result = TemporalMetricProcessor.merge(last.accumulations, unreportedAccumulations, this._aggregator); } } else { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index bf7ba406a0..c294eac87d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -138,3 +138,15 @@ export function FlatMap(arr: T[], fn: (it: T) => R[]): R[] { }); return result; } + +export function setEquals(lhs: Set, rhs: Set): boolean { + if (lhs.size !== rhs.size) { + return false; + } + for (const item of lhs) { + if (!rhs.has(item)) { + return false; + } + } + return true; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts index 7c82dfc581..3de26bdf81 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts @@ -348,7 +348,8 @@ describe('Instruments', () => { } } }); - meter.createObservableCounter('test', callback); + const observableCounter = meter.createObservableCounter('test'); + observableCounter.addCallback(callback); await deltaReader.collect(); assert.strictEqual(callback.callCount, 1); @@ -357,7 +358,8 @@ describe('Instruments', () => { it('should observe values', async () => { const { meter, cumulativeReader } = setup(); let callCount = 0; - meter.createObservableCounter('test', observableResult => { + const observableCounter = meter.createObservableCounter('test'); + observableCounter.addCallback(observableResult => { observableResult.observe(++callCount); observableResult.observe(1, { foo: 'bar' }); }); @@ -401,7 +403,8 @@ describe('Instruments', () => { } } }); - meter.createObservableUpDownCounter('test', callback); + const observableUpDownCounter = meter.createObservableUpDownCounter('test'); + observableUpDownCounter.addCallback(callback); await deltaReader.collect(); assert.strictEqual(callback.callCount, 1); @@ -410,7 +413,8 @@ describe('Instruments', () => { it('should observe values', async () => { const { meter, cumulativeReader } = setup(); let callCount = 0; - meter.createObservableUpDownCounter('test', observableResult => { + const observableUpDownCounter = meter.createObservableUpDownCounter('test'); + observableUpDownCounter.addCallback(observableResult => { observableResult.observe(++callCount); observableResult.observe(1, { foo: 'bar' }); }); @@ -454,7 +458,8 @@ describe('Instruments', () => { } } }); - meter.createObservableGauge('test', callback); + const observableGauge = meter.createObservableGauge('test'); + observableGauge.addCallback(callback); await deltaReader.collect(); assert.strictEqual(callback.callCount, 1); @@ -463,7 +468,8 @@ describe('Instruments', () => { it('should observe values', async () => { const { meter, cumulativeReader } = setup(); let num = 0; - meter.createObservableGauge('test', observableResult => { + const observableGauge = meter.createObservableGauge('test'); + observableGauge.addCallback(observableResult => { num += 10; if (num === 30) { observableResult.observe(-1); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts index 1a3fda840d..ada1234d58 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts @@ -14,16 +14,21 @@ * limitations under the License. */ -import { ObservableCallback } from '@opentelemetry/api-metrics'; +import { Observable } from '@opentelemetry/api-metrics'; import * as assert from 'assert'; -import { CounterInstrument, HistogramInstrument, UpDownCounterInstrument } from '../src/Instruments'; +import { + CounterInstrument, + HistogramInstrument, + ObservableCounterInstrument, + ObservableGaugeInstrument, + ObservableUpDownCounterInstrument, + UpDownCounterInstrument, +} from '../src/Instruments'; import { Meter } from '../src/Meter'; import { MeterProviderSharedState } from '../src/state/MeterProviderSharedState'; import { MeterSharedState } from '../src/state/MeterSharedState'; import { defaultInstrumentationScope, defaultResource } from './util'; -const noopObservableCallback: ObservableCallback = _observableResult => {}; - describe('Meter', () => { describe('createCounter', () => { it('should create counter', () => { @@ -42,8 +47,8 @@ describe('Meter', () => { new MeterProviderSharedState(defaultResource), defaultInstrumentationScope); const meter = new Meter(meterSharedState); - const counter = meter.createUpDownCounter('foobar'); - assert(counter instanceof UpDownCounterInstrument); + const upDownCounter = meter.createUpDownCounter('foobar'); + assert(upDownCounter instanceof UpDownCounterInstrument); }); }); @@ -53,8 +58,8 @@ describe('Meter', () => { new MeterProviderSharedState(defaultResource), defaultInstrumentationScope); const meter = new Meter(meterSharedState); - const counter = meter.createHistogram('foobar'); - assert(counter instanceof HistogramInstrument); + const histogram = meter.createHistogram('foobar'); + assert(histogram instanceof HistogramInstrument); }); }); @@ -64,7 +69,8 @@ describe('Meter', () => { new MeterProviderSharedState(defaultResource), defaultInstrumentationScope); const meter = new Meter(meterSharedState); - meter.createObservableGauge('foobar', noopObservableCallback); + const observableGauge = meter.createObservableGauge('foobar'); + assert(observableGauge instanceof ObservableGaugeInstrument); }); }); @@ -74,7 +80,8 @@ describe('Meter', () => { new MeterProviderSharedState(defaultResource), defaultInstrumentationScope); const meter = new Meter(meterSharedState); - meter.createObservableCounter('foobar', noopObservableCallback); + const observableCounter = meter.createObservableCounter('foobar'); + assert(observableCounter instanceof ObservableCounterInstrument); }); }); @@ -84,7 +91,56 @@ describe('Meter', () => { new MeterProviderSharedState(defaultResource), defaultInstrumentationScope); const meter = new Meter(meterSharedState); - meter.createObservableUpDownCounter('foobar', noopObservableCallback); + const observableUpDownCounter = meter.createObservableUpDownCounter('foobar'); + assert(observableUpDownCounter instanceof ObservableUpDownCounterInstrument); + }); + }); + + describe('addBatchObservableCallback', () => { + it('should register callback without exception', () => { + const meterSharedState = new MeterSharedState( + new MeterProviderSharedState(defaultResource), + defaultInstrumentationScope); + const meter = new Meter(meterSharedState); + const observableGauge = meter.createObservableGauge('test-gauge'); + const observableCounter = meter.createObservableCounter('test-counter'); + const observableUpDownCounter = meter.createObservableUpDownCounter('test-up-down-counter'); + + meter.addBatchObservableCallback(() => {}, [ observableGauge, observableCounter, observableUpDownCounter ]); + }); + + it('should be tolerant with unknown observables', () => { + const meterSharedState = new MeterSharedState( + new MeterProviderSharedState(defaultResource), + defaultInstrumentationScope); + const meter = new Meter(meterSharedState); + + const observables = [ + {}, + 1, + 'foo', + Symbol(), + ] as unknown as Observable[]; + meter.addBatchObservableCallback(() => {}, observables); + }); + }); + + describe('removeBatchObservableCallback', () => { + it('should remove callback without exception', () => { + const meterSharedState = new MeterSharedState( + new MeterProviderSharedState(defaultResource), + defaultInstrumentationScope); + const meter = new Meter(meterSharedState); + const observableGauge = meter.createObservableGauge('test-gauge'); + const observableCounter = meter.createObservableCounter('test-counter'); + const observableUpDownCounter = meter.createObservableUpDownCounter('test-up-down-counter'); + + const callback = () => {}; + meter.addBatchObservableCallback(callback, [ observableGauge, observableCounter, observableUpDownCounter ]); + meter.removeBatchObservableCallback(callback, [ observableGauge, observableCounter, observableUpDownCounter ]); + + // Remove a not registered callback. + meter.removeBatchObservableCallback(() => {}, []); }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/ObservableResult.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/ObservableResult.test.ts index 741ee28c55..f6a38ce443 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/ObservableResult.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/ObservableResult.test.ts @@ -14,14 +14,18 @@ * limitations under the License. */ +import { ValueType } from '@opentelemetry/api-metrics'; import * as assert from 'assert'; -import { ObservableResult } from '../src/ObservableResult'; -import { commonAttributes, commonValues } from './util'; +import { BatchObservableResultImpl, InstrumentType } from '../src'; +import { ObservableInstrument } from '../src/Instruments'; +import { ObservableResultImpl } from '../src/ObservableResult'; +import { ObservableRegistry } from '../src/state/ObservableRegistry'; +import { commonAttributes, commonValues, defaultInstrumentDescriptor } from './util'; -describe('ObservableResult', () => { +describe('ObservableResultImpl', () => { describe('observe', () => { - it('should observe', () => { - const observableResult = new ObservableResult(); + it('should observe common values', () => { + const observableResult = new ObservableResultImpl(defaultInstrumentDescriptor); for (const value of commonValues) { for (const attributes of commonAttributes) { observableResult.observe(value, attributes); @@ -30,13 +34,72 @@ describe('ObservableResult', () => { }); it('should deduplicate observations', () => { - const observableResult = new ObservableResult(); + const observableResult = new ObservableResultImpl(defaultInstrumentDescriptor); observableResult.observe(1, {}); observableResult.observe(2, {}); - assert.strictEqual(observableResult.buffer.size, 1); - assert(observableResult.buffer.has({})); - assert.strictEqual(observableResult.buffer.get({}), 2); + assert.strictEqual(observableResult._buffer.size, 1); + assert(observableResult._buffer.has({})); + assert.strictEqual(observableResult._buffer.get({}), 2); + }); + + it('should trunc value if ValueType is INT', () => { + const observableResult = new ObservableResultImpl({ + name: 'test', + description: '', + type: InstrumentType.COUNTER, + unit: '', + valueType: ValueType.INT, + }); + observableResult.observe(1.1, {}); + assert.strictEqual(observableResult._buffer.get({}), 1); + }); + }); +}); + +describe('BatchObservableResultImpl', () => { + describe('observe', () => { + it('should observe common values', () => { + const observableResult = new BatchObservableResultImpl(); + const observable = new ObservableInstrument(defaultInstrumentDescriptor, [], new ObservableRegistry()); + for (const value of commonValues) { + for (const attributes of commonAttributes) { + observableResult.observe(observable, value, attributes); + } + } + }); + + it('should deduplicate observations', () => { + const observableResult = new BatchObservableResultImpl(); + const observableRegistry = new ObservableRegistry(); + const observable1 = new ObservableInstrument(defaultInstrumentDescriptor, [], observableRegistry); + const observable2 = new ObservableInstrument(defaultInstrumentDescriptor, [], observableRegistry); + observableResult.observe(observable1, 1, {}); + observableResult.observe(observable1, 2, {}); + observableResult.observe(observable2, 4, {}); + + assert.strictEqual(observableResult._buffer.size, 2); + assert(observableResult._buffer.has(observable1)); + assert(observableResult._buffer.has(observable2)); + + const observable1Buffer = observableResult._buffer.get(observable1); + const observable2Buffer = observableResult._buffer.get(observable2); + assert.strictEqual(observable1Buffer?.get({}), 2); + assert.strictEqual(observable2Buffer?.get({}), 4); + }); + + it('should trunc value if ValueType is INT', () => { + const observableResult = new BatchObservableResultImpl(); + const observable = new ObservableInstrument({ + name: 'test', + description: '', + type: InstrumentType.COUNTER, + unit: '', + valueType: ValueType.INT, + }, [], new ObservableRegistry()); + + observableResult.observe(observable, 1.1, {}); + assert.strictEqual(observableResult._buffer.get(observable)?.get({}), 1); }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts index dd64a8a3b3..f65b642898 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts @@ -25,6 +25,7 @@ import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage'; import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor'; import { ObservableRegistry } from '../../src/state/ObservableRegistry'; import { assertMetricData, assertDataPoint, defaultInstrumentDescriptor, ObservableCallbackDelegate } from '../util'; +import { ObservableInstrument } from '../../src/Instruments'; const deltaCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.DELTA, @@ -48,7 +49,13 @@ describe('AsyncMetricStorage', () => { new SumAggregator(), new NoopAttributesProcessor(), ); - observableRegistry.addCallback(delegate.getCallback(), metricStorage); + const observable = new ObservableInstrument( + defaultInstrumentDescriptor, + [metricStorage], + observableRegistry + ); + + observableRegistry.addCallback(delegate.getCallback(), observable); delegate.setDelegate(observableResult => { observableResult.observe(1, { key: '1' }); @@ -117,7 +124,13 @@ describe('AsyncMetricStorage', () => { new SumAggregator(), new NoopAttributesProcessor(), ); - observableRegistry.addCallback(delegate.getCallback(), metricStorage); + const observable = new ObservableInstrument( + defaultInstrumentDescriptor, + [metricStorage], + observableRegistry + ); + + observableRegistry.addCallback(delegate.getCallback(), observable); delegate.setDelegate(observableResult => { observableResult.observe(1, { key: '1' }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts index 675ca6d427..1ecc5732c1 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts @@ -106,7 +106,8 @@ describe('MeterSharedState', () => { /** creating metric events */ let observableCalledCount = 0; - meter.createObservableCounter('test', observableResult => { + const observableCounter = meter.createObservableCounter('test'); + observableCounter.addCallback(observableResult => { observableCalledCount++; observableResult.observe(1); @@ -132,7 +133,7 @@ describe('MeterSharedState', () => { assert.strictEqual(observableCalledCount, 6); }); - it('should call observable callback with view-ed async instruments', async () => { + it('should call observable callback once with view-ed async instruments', async () => { /** preparing test instrumentations */ const { metricCollectors, meter, meterProvider } = setupInstruments(); @@ -149,7 +150,8 @@ describe('MeterSharedState', () => { }); let observableCalledCount = 0; - meter.createObservableCounter('test', observableResult => { + const observableCounter = meter.createObservableCounter('test'); + observableCounter.addCallback(observableResult => { observableCalledCount++; observableResult.observe(1); @@ -176,7 +178,10 @@ describe('MeterSharedState', () => { ...metricCollectors.map(collector => collector.collect().then(verifyResult)), sleep(1).then(() => metricCollectors[0].collect().then(verifyResult)), ]); - assert.strictEqual(observableCalledCount, 6); + /** + * Two collectors, one collects 2 times, one collects 1 time. + */ + assert.strictEqual(observableCalledCount, 3); /** collect metrics */ await Promise.all([ @@ -184,7 +189,7 @@ describe('MeterSharedState', () => { ...metricCollectors.map(collector => collector.collect().then(verifyResult)), sleep(1).then(() => metricCollectors[0].collect().then(verifyResult)), ]); - assert.strictEqual(observableCalledCount, 12); + assert.strictEqual(observableCalledCount, 6); }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 38239c86b7..2070eb2e41 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -27,6 +27,7 @@ import { assertMetricData, assertDataPoint, ObservableCallbackDelegate, + BatchObservableCallbackDelegate, } from '../util'; import { TestMetricReader } from '../export/TestMetricReader'; import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter'; @@ -63,7 +64,7 @@ describe('MetricCollector', () => { return { metricCollector, meter }; } - it('should collect metrics', async () => { + it('should collect sync metrics', async () => { /** preparing test instrumentations */ const exporter = new TestMetricExporter(); const { metricCollector, meter } = setupInstruments(exporter); @@ -101,6 +102,64 @@ describe('MetricCollector', () => { assertDataPoint(metricData2.dataPoints[0], {}, 3); }); + it('should collect async metrics', async () => { + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + /** observable */ + const delegate1 = new ObservableCallbackDelegate(); + const observableCounter1 = meter.createObservableCounter('observable1'); + observableCounter1.addCallback(delegate1.getCallback()); + delegate1.setDelegate(observableResult => { + observableResult.observe(1, {}); + observableResult.observe(2, { foo: 'bar' }); + }); + + /** batch observable */ + const delegate2 = new BatchObservableCallbackDelegate(); + const observableCounter2 = meter.createObservableCounter('observable2'); + const observableCounter3 = meter.createObservableCounter('observable3'); + meter.addBatchObservableCallback(delegate2.getCallback(), [ observableCounter2, observableCounter3 ]); + delegate2.setDelegate(observableResult => { + observableResult.observe(observableCounter2, 3, {}); + observableResult.observe(observableCounter2, 4, { foo: 'bar' }); + }); + + /** collect metrics */ + const { resourceMetrics, errors } = await metricCollector.collect(); + assert.strictEqual(errors.length, 0); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 3); + + /** checking batch[0] */ + const metricData1 = metrics[0]; + assertMetricData(metricData1, DataPointType.SINGULAR, { + name: 'observable1' + }); + assert.strictEqual(metricData1.dataPoints.length, 2); + assertDataPoint(metricData1.dataPoints[0], {}, 1); + assertDataPoint(metricData1.dataPoints[1], { foo: 'bar' }, 2); + + /** checking batch[1] */ + const metricData2 = metrics[1]; + assertMetricData(metricData2, DataPointType.SINGULAR, { + name: 'observable2' + }); + assert.strictEqual(metricData2.dataPoints.length, 2); + assertDataPoint(metricData2.dataPoints[0], {}, 3); + assertDataPoint(metricData2.dataPoints[1], { foo: 'bar' }, 4); + + /** checking batch[2] */ + const metricData3 = metrics[2]; + assertMetricData(metricData3, DataPointType.SINGULAR, { + name: 'observable3' + }); + assert.strictEqual(metricData3.dataPoints.length, 0); + }); + it('should collect observer metrics with timeout', async () => { sinon.useFakeTimers(); /** preparing test instrumentations */ @@ -111,7 +170,8 @@ describe('MetricCollector', () => { /** observer1 is an abnormal observer */ const delegate1 = new ObservableCallbackDelegate(); - meter.createObservableCounter('observer1', delegate1.getCallback()); + const observableCounter1 = meter.createObservableCounter('observer1'); + observableCounter1.addCallback(delegate1.getCallback()); delegate1.setDelegate(_observableResult => { return new Promise(() => { /** promise never settles */ @@ -120,7 +180,8 @@ describe('MetricCollector', () => { /** observer2 is a normal observer */ const delegate2 = new ObservableCallbackDelegate(); - meter.createObservableCounter('observer2', delegate2.getCallback()); + const observableCounter2 = meter.createObservableCounter('observer2'); + observableCounter2.addCallback(delegate2.getCallback()); delegate2.setDelegate(observableResult => { observableResult.observe(1, {}); }); @@ -193,8 +254,129 @@ describe('MetricCollector', () => { counter.add(1); /** observer1 is an abnormal observer */ - const delegate1 = new ObservableCallbackDelegate(); - meter.createObservableCounter('observer1', delegate1.getCallback()); + const observableCounter1 = meter.createObservableCounter('observer1'); + observableCounter1.addCallback(_observableResult => { + throw new Error('foobar'); + }); + + /** collect metrics */ + const { resourceMetrics, errors } = await metricCollector.collect(); + assert.strictEqual(errors.length, 1); + assert.strictEqual(`${errors[0]}`, 'Error: foobar'); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 2); + + /** counter1 data points are collected */ + assertMetricData(metrics[0], DataPointType.SINGULAR, { + name: 'counter1' + }); + assert.strictEqual(metrics[0].dataPoints.length, 1); + + /** observer1 data points are not collected */ + assertMetricData(metrics[1], DataPointType.SINGULAR, { + name: 'observer1' + }); + assert.strictEqual(metrics[1].dataPoints.length, 0); + }); + + it('should collect batch observer metrics with timeout', async () => { + sinon.useFakeTimers(); + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + + /** observer1 is an abnormal observer */ + const observableCounter1 = meter.createObservableCounter('observer1'); + const delegate1 = new BatchObservableCallbackDelegate(); + meter.addBatchObservableCallback(delegate1.getCallback(), [ observableCounter1 ]); + delegate1.setDelegate(_observableResult => { + return new Promise(() => { + /** promise never settles */ + }); + }); + + /** observer2 is a normal observer */ + const observableCounter2 = meter.createObservableCounter('observer2'); + const delegate2 = new BatchObservableCallbackDelegate(); + meter.addBatchObservableCallback(delegate2.getCallback(), [ observableCounter2 ]); + delegate2.setDelegate(observableResult => { + observableResult.observe(observableCounter2, 1, {}); + }); + + /** collect metrics */ + { + const future = metricCollector.collect({ + timeoutMillis: 100, + }); + sinon.clock.tick(200); + const { resourceMetrics, errors } = await future; + assert.strictEqual(errors.length, 1); + assert(errors[0] instanceof TimeoutError); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 2); + + /** observer1 */ + assertMetricData(metrics[0], DataPointType.SINGULAR, { + name: 'observer1' + }); + assert.strictEqual(metrics[0].dataPoints.length, 0); + + /** observer2 */ + assertMetricData(metrics[1], DataPointType.SINGULAR, { + name: 'observer2' + }); + assert.strictEqual(metrics[1].dataPoints.length, 1); + } + + /** now the observer1 is back to normal */ + delegate1.setDelegate(async observableResult => { + observableResult.observe(observableCounter1, 100, {}); + }); + + /** collect metrics */ + { + const future = metricCollector.collect({ + timeoutMillis: 100, + }); + sinon.clock.tick(100); + const { resourceMetrics, errors } = await future; + assert.strictEqual(errors.length, 0); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 2); + + /** observer1 */ + assertMetricData(metrics[0], DataPointType.SINGULAR, { + name: 'observer1' + }); + assert.strictEqual(metrics[0].dataPoints.length, 1); + assertDataPoint(metrics[0].dataPoints[0], {}, 100); + + /** observer2 */ + assertMetricData(metrics[1], DataPointType.SINGULAR, { + name: 'observer2' + }); + assert.strictEqual(metrics[1].dataPoints.length, 1); + } + }); + + it('should collect with throwing batch observable callbacks', async () => { + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + const counter = meter.createCounter('counter1'); + counter.add(1); + + /** observer1 is an abnormal observer */ + const observableCounter1 = meter.createObservableCounter('observer1'); + const delegate1 = new BatchObservableCallbackDelegate(); + meter.addBatchObservableCallback(delegate1.getCallback(), [ observableCounter1 ]); delegate1.setDelegate(_observableResult => { throw new Error('foobar'); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts index 934863cefd..d202895b69 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts @@ -17,10 +17,9 @@ import * as api from '@opentelemetry/api'; import { MetricAttributes } from '@opentelemetry/api-metrics'; import * as assert from 'assert'; -import { Measurement } from '../../src/Measurement'; import { MultiMetricStorage } from '../../src/state/MultiWritableMetricStorage'; import { WritableMetricStorage } from '../../src/state/WritableMetricStorage'; -import { assertMeasurementEqual, commonAttributes, commonValues } from '../util'; +import { assertMeasurementEqual, commonAttributes, commonValues, Measurement } from '../util'; describe('MultiMetricStorage', () => { describe('record', () => { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/ObservableRegistry.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/ObservableRegistry.test.ts new file mode 100644 index 0000000000..7be5b0b96b --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/ObservableRegistry.test.ts @@ -0,0 +1,110 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { ObservableInstrument } from '../../src/Instruments'; +import { ObservableRegistry } from '../../src/state/ObservableRegistry'; +import { defaultInstrumentDescriptor } from '../util'; + +describe('ObservableRegistry', () => { + const callback1 = () => {}; + const callback2 = () => {}; + let observableRegistry: ObservableRegistry; + let instrument1: ObservableInstrument; + let instrument2: ObservableInstrument; + + beforeEach(() => { + observableRegistry = new ObservableRegistry(); + instrument1 = new ObservableInstrument(defaultInstrumentDescriptor, [], observableRegistry); + instrument2 = new ObservableInstrument(defaultInstrumentDescriptor, [], observableRegistry); + }); + + describe('addCallback', () => { + it('should add multiple callbacks for one instrument', () => { + observableRegistry.addCallback(callback1, instrument1); + observableRegistry.addCallback(callback2, instrument1); + + assert.strictEqual(observableRegistry['_callbacks'].length, 2); + assert.strictEqual(observableRegistry['_callbacks'][0].callback, callback1); + assert.strictEqual(observableRegistry['_callbacks'][0].instrument, instrument1); + + assert.strictEqual(observableRegistry['_callbacks'][1].callback, callback2); + assert.strictEqual(observableRegistry['_callbacks'][1].instrument, instrument1); + }); + + it('should not add duplicated callbacks', () => { + observableRegistry.addCallback(callback1, instrument1); + observableRegistry.addCallback(callback1, instrument1); + + assert.strictEqual(observableRegistry['_callbacks'].length, 1); + assert.strictEqual(observableRegistry['_callbacks'][0].callback, callback1); + assert.strictEqual(observableRegistry['_callbacks'][0].instrument, instrument1); + }); + }); + + describe('removeCallback', () => { + it('should remove callback with instrument', () => { + observableRegistry.addCallback(callback1, instrument1); + observableRegistry.addCallback(callback2, instrument1); + observableRegistry.addCallback(callback1, instrument2); + observableRegistry.addCallback(callback2, instrument2); + assert.strictEqual(observableRegistry['_callbacks'].length, 4); + + // remove (callback1, instrument1) + observableRegistry.removeCallback(callback1, instrument1); + observableRegistry.removeCallback(callback1, instrument1); + assert.strictEqual(observableRegistry['_callbacks'].length, 3); + }); + }); + + describe('addBatchCallback', () => { + it('should add callback with associated instruments', () => { + observableRegistry.addBatchCallback(callback1, [instrument1]); + observableRegistry.addBatchCallback(callback2, [instrument1]); + + // duplicated pairs. + observableRegistry.addBatchCallback(callback1, [instrument1, instrument2]); + observableRegistry.addBatchCallback(callback1, [instrument1, instrument2]); + + assert.strictEqual(observableRegistry['_batchCallbacks'].length, 3); + }); + + it('should ignore callback without associated instruments', () => { + observableRegistry.addBatchCallback(callback1, []); + // eslint-disable-next-line no-sparse-arrays + observableRegistry.addBatchCallback(callback1, [1, /* hole */, undefined, 2] as unknown as ObservableInstrument[]); + + assert.strictEqual(observableRegistry['_batchCallbacks'].length, 0); + }); + }); + + describe('removeBatchCallback', () => { + it('should remove callback with associated instruments', () => { + observableRegistry.addBatchCallback(callback1, [instrument1]); + observableRegistry.addBatchCallback(callback2, [instrument1]); + observableRegistry.addBatchCallback(callback1, [instrument1, instrument2]); + assert.strictEqual(observableRegistry['_batchCallbacks'].length, 3); + + observableRegistry.removeBatchCallback(callback1, [instrument1]); + assert.strictEqual(observableRegistry['_batchCallbacks'].length, 2); + + // remove twice + observableRegistry.removeBatchCallback(callback1, [instrument1, instrument2]); + observableRegistry.removeBatchCallback(callback1, [instrument1, instrument2]); + assert.strictEqual(observableRegistry['_batchCallbacks'].length, 1); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index 9a044dcc0e..c889d6723a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -14,7 +14,13 @@ * limitations under the License. */ -import { MetricAttributes, ValueType, ObservableCallback } from '@opentelemetry/api-metrics'; +import * as api from '@opentelemetry/api'; +import { + BatchObservableCallback, + MetricAttributes, + ObservableCallback, + ValueType, +} from '@opentelemetry/api-metrics'; import { InstrumentationScope } from '@opentelemetry/core'; import { Resource } from '@opentelemetry/resources'; import * as assert from 'assert'; @@ -25,12 +31,18 @@ import { DataPointType, ScopeMetrics } from '../src/export/MetricData'; -import { Measurement } from '../src/Measurement'; import { isNotNullish } from '../src/utils'; import { HrTime } from '@opentelemetry/api'; import { Histogram } from '../src/aggregator/types'; import { AggregationTemporality } from '../src/export/AggregationTemporality'; +export type Measurement = { + value: number; + // TODO: use common attributes + attributes: MetricAttributes + context?: api.Context; +}; + export const defaultResource = new Resource({ resourceKey: 'my-resource', }); @@ -147,3 +159,16 @@ export class ObservableCallbackDelegate { }; } } + +export class BatchObservableCallbackDelegate { + private _delegate?: BatchObservableCallback; + setDelegate(delegate: BatchObservableCallback) { + this._delegate = delegate; + } + + getCallback(): BatchObservableCallback { + return observableResult => { + return this._delegate?.(observableResult); + }; + } +}