From 5282e9f069fd2b11ec9b1bc8f8d83e8f0a83fba4 Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 28 Oct 2021 16:54:14 +0800 Subject: [PATCH] feat: aggregation support --- .../src/types/Metric.ts | 6 +- .../src/InstrumentDescriptor.ts | 14 +- .../src/Instruments.ts | 2 +- .../src/Measurement.ts | 2 +- .../src/Meter.ts | 30 ++- .../src/MeterProvider.ts | 2 +- .../src/aggregator/Drop.ts | 57 ++++++ .../src/aggregator/Histogram.ts | 158 +++++++++++++++ .../src/aggregator/LastValue.ts | 78 ++++++++ .../src/aggregator/Sum.ts | 78 ++++++++ .../noop.test.ts => src/aggregator/index.ts} | 8 +- .../src/aggregator/types.ts | 70 +++++++ .../src/export/CollectorInfo.ts | 32 +++ .../src/export/MetricData.ts | 94 +++++++++ .../src/state/DeltaMetricStorage.ts | 57 ++++++ .../src/state/MeterProviderSharedState.ts | 5 + .../src/state/MetricStorage.ts | 39 ++++ .../src/state/MultiWritableMetricStorage.ts | 2 +- .../src/state/SyncMetricStorage.ts | 75 +++++++ .../src/state/TemporalMetricStorage.ts | 128 ++++++++++++ .../src/state/WritableMetricStorage.ts | 8 +- .../src/utils.ts | 41 ++++ .../src/view/Aggregation.ts | 92 ++++++++- .../src/view/AttributesProcessor.ts | 2 +- .../src/view/View.ts | 4 +- .../test/aggregator/Drop.test.ts | 70 +++++++ .../test/aggregator/Histogram.test.ts | 185 ++++++++++++++++++ .../test/aggregator/LastValue.test.ts | 148 ++++++++++++++ .../test/aggregator/Sum.test.ts | 156 +++++++++++++++ .../test/state/DeltaMetricStorage.test.ts | 34 ++++ .../state/MultiWritableMetricStorage.test.ts | 66 +++++++ .../test/state/SyncMetricStorage.test.ts | 98 ++++++++++ .../test/state/TemporalMetricStorage.test.ts | 60 ++++++ .../test/util.ts | 64 ++++++ .../test/view/Aggregation.test.ts | 69 +++++++ .../test/view/View.test.ts | 2 +- .../test/view/ViewRegistry.test.ts | 19 +- 37 files changed, 2006 insertions(+), 49 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts rename experimental/packages/opentelemetry-sdk-metrics-base/{test/noop.test.ts => src/aggregator/index.ts} (80%) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/export/CollectorInfo.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricStorage.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricStorage.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricStorage.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricStorage.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/view/Aggregation.test.ts diff --git a/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts b/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts index 3ba86d3e874..b9c3fed1d70 100644 --- a/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts +++ b/experimental/packages/opentelemetry-api-metrics/src/types/Metric.ts @@ -71,9 +71,9 @@ export enum ValueType { /** The kind of aggregator. */ export enum AggregationTemporality { - AGGREGATION_TEMPORALITY_UNSPECIFIED, - AGGREGATION_TEMPORALITY_DELTA, - AGGREGATION_TEMPORALITY_CUMULATIVE, + UNSPECIFIED, + DELTA, + CUMULATIVE, } /** diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts index 452356e3cc3..8aa57c9f68d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts @@ -14,8 +14,10 @@ * limitations under the License. */ -import { MetricOptions, ValueType } from '@opentelemetry/api-metrics'; +import { MetricOptions, ValueType } from '@opentelemetry/api-metrics-wip'; import { InstrumentType } from './Instruments'; +import { View } from './view/View'; + export interface InstrumentDescriptor { readonly name: string; @@ -34,3 +36,13 @@ export function createInstrumentDescriptor(name: string, type: InstrumentType, o valueType: options?.valueType ?? ValueType.DOUBLE, }; } + +export function createInstrumentDescriptorWithView(view: View, instrument: InstrumentDescriptor): InstrumentDescriptor { + return { + name: view.name ?? instrument.name, + description: view.description ?? instrument.description, + type: instrument.type, + unit: instrument.unit, + valueType: instrument.valueType, + }; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts index 3953c9ecff0..decf7047945 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts @@ -15,7 +15,7 @@ */ import * as api from '@opentelemetry/api'; -import * as metrics from '@opentelemetry/api-metrics'; +import * as metrics from '@opentelemetry/api-metrics-wip'; import { InstrumentDescriptor } from './InstrumentDescriptor'; import { WritableMetricStorage } from './state/WritableMetricStorage'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts index 215426f9d76..4ea34a6b6bd 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Measurement.ts @@ -15,7 +15,7 @@ */ import * as api from '@opentelemetry/api' -import { Attributes } from '@opentelemetry/api-metrics' +import { Attributes } from '@opentelemetry/api-metrics-wip' // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#measurement diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts index 1fa967e6a48..1ab638c48e7 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts @@ -14,18 +14,22 @@ * limitations under the License. */ -import * as metrics from '@opentelemetry/api-metrics'; -import { InstrumentationLibrary } from '@opentelemetry/core'; +import * as metrics from '@opentelemetry/api-metrics-wip'; +import { hrTime, InstrumentationLibrary } from '@opentelemetry/core'; import { createInstrumentDescriptor, InstrumentDescriptor } from './InstrumentDescriptor'; import { Counter, Histogram, InstrumentType, UpDownCounter } from './Instruments'; import { MeterProviderSharedState } from './state/MeterProviderSharedState'; import { MultiMetricStorage } from './state/MultiWritableMetricStorage'; -import { NoopWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage'; +import { SyncMetricStorage } from './state/SyncMetricStorage'; +import { MetricStorage } from './state/MetricStorage'; +import { CollectorInfo } from './export/CollectorInfo'; +import { MetricData } from './export/MetricData'; +import { isNotNullish } from './utils'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#meter export class Meter implements metrics.Meter { - private _metricStorageRegistry = new Map(); + private _metricStorageRegistry = new Map(); // instrumentation library required by spec to be on meter // spec requires provider config changes to apply to previously created meters, achieved by holding a reference to the provider @@ -68,9 +72,8 @@ export class Meter implements metrics.Meter { private _registerMetricStorage(descriptor: InstrumentDescriptor) { const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary); - const storages = views.map(_view => { - // TODO: create actual metric storages. - const storage = new NoopWritableMetricStorage(); + const storages = views.map(view => { + const storage = SyncMetricStorage.create(view, descriptor); // TODO: handle conflicts this._metricStorageRegistry.set(descriptor.name, storage); return storage; @@ -80,4 +83,17 @@ export class Meter implements metrics.Meter { } return new MultiMetricStorage(storages); } + + async collectAll(collectorInfo: CollectorInfo): Promise { + const collectionTime = hrTime(); + const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => { + return metricStorage.collectAndReset( + collectorInfo, + this._meterProviderSharedState.resource, + this._instrumentationLibrary, + this._meterProviderSharedState.sdkStartTime, + collectionTime); + })); + return result.filter(isNotNullish); + } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts index 501c9bf2393..28b66806baf 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts @@ -15,7 +15,7 @@ */ import * as api from '@opentelemetry/api'; -import * as metrics from '@opentelemetry/api-metrics'; +import * as metrics from '@opentelemetry/api-metrics-wip'; import { Resource } from '@opentelemetry/resources'; import { Meter } from './Meter'; import { MetricExporter } from './MetricExporter'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts new file mode 100644 index 00000000000..5b3fe507e8d --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { MetricData } from '../export/MetricData'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { Maybe } from '../utils'; +import { + AggregatorKind, + Aggregator, + AccumulationRecord, +} from './types'; + +/** Basic aggregator for None which keeps no recorded value. */ +export class DropAggregator implements Aggregator { + kind: AggregatorKind.NONE = AggregatorKind.NONE; + + createAccumulation() { + return undefined; + } + + merge(_previous: undefined, _delta: undefined) { + return undefined; + } + + diff(_previous: undefined, _current: undefined) { + return undefined; + } + + toMetricData( + _resource: Resource, + _instrumentationLibrary: InstrumentationLibrary, + _instrumentDescriptor: InstrumentDescriptor, + _accumulationByAttributes: AccumulationRecord[], + _temporality: AggregationTemporality, + _sdkStartTime: HrTime, + _lastCollectionTime: HrTime, + _collectionTime: HrTime): Maybe { + return undefined; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts new file mode 100644 index 00000000000..a3155c98ed1 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts @@ -0,0 +1,158 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + Accumulation, + AccumulationRecord, + Aggregator, + AggregatorKind, +} from './types'; +import { Histogram, HistogramMetricData, PointDataType } from '../export/MetricData'; +import { Resource } from '@opentelemetry/resources'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { HrTime } from '@opentelemetry/api'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { Maybe } from '../utils'; + +function createNewEmptyCheckpoint(boundaries: number[]): Histogram { + return { + buckets: { + boundaries, + counts: boundaries.map(() => 0).concat([0]), + }, + sum: 0, + count: 0, + }; +} + +export class HistogramAccumulation implements Accumulation { + constructor(private readonly _boundaries: number[], private _current: Histogram = createNewEmptyCheckpoint(_boundaries)) {} + + record(value: number): void { + this._current.count += 1; + this._current.sum += value; + + for (let i = 0; i < this._boundaries.length; i++) { + if (value < this._boundaries[i]) { + this._current.buckets.counts[i] += 1; + return; + } + } + // value is above all observed boundaries + this._current.buckets.counts[this._boundaries.length] += 1; + } + + toPoint(): Histogram { + return this._current; + } +} + +/** + * Basic aggregator which observes events and counts them in pre-defined buckets + * and provides the total sum and count of all observations. + */ +export class HistogramAggregator implements Aggregator { + public kind: AggregatorKind.HISTOGRAM = AggregatorKind.HISTOGRAM; + private readonly _boundaries: number[]; + + constructor(boundaries: number[]) { + if (boundaries === undefined || boundaries.length === 0) { + throw new Error('HistogramAggregator should be created with boundaries.'); + } + // we need to an ordered set to be able to correctly compute count for each + // boundary since we'll iterate on each in order. + this._boundaries = boundaries.sort((a, b) => a - b); + } + + createAccumulation() { + return new HistogramAccumulation(this._boundaries); + } + + /** + * Return the result of the merge of two histogram accumulations. As long as one Aggregator + * instance produces all Accumulations with constant boundaries we don't need to worry about + * merging accumulations with different boundaries. + */ + merge(previous: HistogramAccumulation, delta: HistogramAccumulation): HistogramAccumulation { + const previousPoint = previous.toPoint(); + const deltaPoint = delta.toPoint(); + + const previousCounts = previousPoint.buckets.counts; + const deltaCounts = deltaPoint.buckets.counts; + + const mergedCounts = new Array(previousCounts.length); + for (let idx = 0; idx < previousCounts.length; idx++) { + mergedCounts[idx] = previousCounts[idx] + deltaCounts[idx]; + } + + return new HistogramAccumulation(previousPoint.buckets.boundaries, { + buckets: { + boundaries: previousPoint.buckets.boundaries, + counts: mergedCounts, + }, + count: previousPoint.count + deltaPoint.count, + sum: previousPoint.sum + deltaPoint.sum, + }); + } + + diff(previous: HistogramAccumulation, current: HistogramAccumulation): HistogramAccumulation { + const previousPoint = previous.toPoint(); + const currentPoint = current.toPoint(); + + const previousCounts = previousPoint.buckets.counts; + const currentCounts = currentPoint.buckets.counts; + + const diffedCounts = new Array(previousCounts.length); + for (let idx = 0; idx < previousCounts.length; idx++) { + diffedCounts[idx] = currentCounts[idx] - previousCounts[idx]; + } + + return new HistogramAccumulation(previousPoint.buckets.boundaries, { + buckets: { + boundaries: previousPoint.buckets.boundaries, + counts: diffedCounts, + }, + count: currentPoint.count - previousPoint.count, + sum: currentPoint.sum - previousPoint.sum, + }); + } + + toMetricData( + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + metricDescriptor: InstrumentDescriptor, + accumulationByAttributes: AccumulationRecord[], + temporality: AggregationTemporality, + sdkStartTime: HrTime, + lastCollectionTime: HrTime, + collectionTime: HrTime): Maybe { + return { + resource, + instrumentationLibrary, + instrumentDescriptor: metricDescriptor, + pointDataType: PointDataType.HISTOGRAM, + pointData: accumulationByAttributes.map(([attributes, accumulation]) => { + return { + attributes, + startTime: temporality === AggregationTemporality.CUMULATIVE ? sdkStartTime : lastCollectionTime, + endTime: collectionTime, + point: accumulation.toPoint(), + } + }) + } + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts new file mode 100644 index 00000000000..03716db0302 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Sum, AggregatorKind, Aggregator, Accumulation, AccumulationRecord } from './types'; +import { HrTime } from '@opentelemetry/api'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { PointDataType, SingularMetricData } from '../export/MetricData'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { Maybe } from '../utils'; + +export class LastValueAccumulation implements Accumulation { + constructor(private _current: number = 0) {} + + record(value: number): void { + this._current = value; + } + + toPoint(): Sum { + return this._current; + } +} + +/** Basic aggregator which calculates a Sum from individual measurements. */ +export class LastValueAggregator implements Aggregator { + public kind: AggregatorKind.LAST_VALUE = AggregatorKind.LAST_VALUE; + + createAccumulation() { + return new LastValueAccumulation(); + } + + merge(_previous: LastValueAccumulation, delta: LastValueAccumulation): LastValueAccumulation { + return new LastValueAccumulation(delta.toPoint()); + } + + diff(_previous: LastValueAccumulation, current: LastValueAccumulation): LastValueAccumulation { + return new LastValueAccumulation(current.toPoint()); + } + + toMetricData( + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + instrumentDescriptor: InstrumentDescriptor, + accumulationByAttributes: AccumulationRecord[], + temporality: AggregationTemporality, + sdkStartTime: HrTime, + lastCollectionTime: HrTime, + collectionTime: HrTime): Maybe { + return { + resource, + instrumentationLibrary, + instrumentDescriptor, + pointDataType: PointDataType.SINGULAR, + pointData: accumulationByAttributes.map(([attributes, accumulation]) => { + return { + attributes, + startTime: temporality === AggregationTemporality.CUMULATIVE ? sdkStartTime : lastCollectionTime, + endTime: collectionTime, + point: accumulation.toPoint(), + } + }) + } + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts new file mode 100644 index 00000000000..19aea88acab --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Sum, AggregatorKind, Aggregator, Accumulation, AccumulationRecord } from './types'; +import { HrTime } from '@opentelemetry/api'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { PointDataType, SingularMetricData } from '../export/MetricData'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { Maybe } from '../utils'; + +export class SumAccumulation implements Accumulation { + constructor(private _current: number = 0) {} + + record(value: number): void { + this._current += value; + } + + toPoint(): Sum { + return this._current; + } +} + +/** Basic aggregator which calculates a Sum from individual measurements. */ +export class SumAggregator implements Aggregator { + public kind: AggregatorKind.SUM = AggregatorKind.SUM; + + createAccumulation() { + return new SumAccumulation(); + } + + merge(previous: SumAccumulation, delta: SumAccumulation): SumAccumulation { + return new SumAccumulation(previous.toPoint() + delta.toPoint()); + } + + diff(previous: SumAccumulation, current: SumAccumulation): SumAccumulation { + return new SumAccumulation(current.toPoint() - previous.toPoint()); + } + + toMetricData( + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + instrumentDescriptor: InstrumentDescriptor, + accumulationByAttributes: AccumulationRecord[], + temporality: AggregationTemporality, + sdkStartTime: HrTime, + lastCollectionTime: HrTime, + collectionTime: HrTime): Maybe { + return { + resource, + instrumentationLibrary, + instrumentDescriptor, + pointDataType: PointDataType.SINGULAR, + pointData: accumulationByAttributes.map(([attributes, accumulation]) => { + return { + attributes, + startTime: temporality === AggregationTemporality.CUMULATIVE ? sdkStartTime : lastCollectionTime, + endTime: collectionTime, + point: accumulation.toPoint(), + } + }) + } + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/noop.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/index.ts similarity index 80% rename from experimental/packages/opentelemetry-sdk-metrics-base/test/noop.test.ts rename to experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/index.ts index 76c007ebc06..8267e5d8d32 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/noop.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/index.ts @@ -14,6 +14,8 @@ * limitations under the License. */ -describe('nothing', () => { - it.skip('tests nothing'); -}); +export * from './Histogram'; +export * from './LastValue'; +export * from './Drop'; +export * from './Sum'; +export { Aggregator } from './types'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts new file mode 100644 index 00000000000..ec786f2d513 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AggregationTemporality, Attributes } from '@opentelemetry/api-metrics-wip'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { Histogram, MetricData } from '../export/MetricData'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { Maybe } from '../utils'; + +/** The kind of aggregator. */ +export enum AggregatorKind { + NONE, + SUM, + LAST_VALUE, + HISTOGRAM, +} + +/** Sum returns an aggregated sum. */ +export type Sum = number; + +/** LastValue returns last value. */ +export type LastValue = number; + +export type PointValueType = Sum | LastValue | Histogram; + +export interface Accumulation { + // TODO: attributes and context for `ExemplarReservoir.offer`. + record(value: number): void; +} + +export type AccumulationRecord = [Attributes, T]; + +/** + * Base interface for aggregators. Aggregators are responsible for holding + * aggregated values and taking a snapshot of these values upon export. + */ +export interface Aggregator { + /** The kind of the aggregator. */ + kind: AggregatorKind; + + createAccumulation(): T; + + merge(previous: T, delta: T): T; + + diff(previous: T, current: T): T; + + toMetricData(resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + instrumentDescriptor: InstrumentDescriptor, + accumulationByAttributes: AccumulationRecord[], + temporality: AggregationTemporality, + sdkStartTime: HrTime, + lastCollectionTime: HrTime, + collectionTime: HrTime): Maybe; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/CollectorInfo.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/CollectorInfo.ts new file mode 100644 index 00000000000..781e2a6be6b --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/CollectorInfo.ts @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; + +/** + * A opaque handle for a collection-pipeline of metrics. + * + * This provides an efficient means of leasing and tracking metric readers. + */ +export type CollectorHandle = symbol; + +/** + * An internal record about a {@link MetricReader} used when collecting metrics. + */ +export interface CollectorInfo { + collectorHandle: CollectorHandle; + aggregationTemporality: AggregationTemporality; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts new file mode 100644 index 00000000000..c36f3b012a4 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts @@ -0,0 +1,94 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { + Attributes, +} from '@opentelemetry/api-metrics-wip'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; + +export interface Histogram { + /** + * Buckets are implemented using two different arrays: + * - boundaries: contains every finite bucket boundary, which are inclusive lower bounds + * - counts: contains event counts for each bucket + * + * Note that we'll always have n+1 buckets, where n is the number of boundaries. + * This is because we need to count events that are below the lowest boundary. + * + * Example: if we measure the values: [5, 30, 5, 40, 5, 15, 15, 15, 25] + * with the boundaries [ 10, 20, 30 ], we will have the following state: + * + * buckets: { + * boundaries: [10, 20, 30], + * counts: [3, 3, 1, 2], + * } + */ + buckets: { + boundaries: number[]; + counts: number[]; + }; + sum: number; + count: number; +} + +export interface BaseMetricData { + readonly resource: Resource; + readonly instrumentationLibrary: InstrumentationLibrary; + readonly instrumentDescriptor: InstrumentDescriptor; + readonly pointDataType: PointDataType, +} + +export interface SingularMetricData extends BaseMetricData { + readonly pointDataType: PointDataType.SINGULAR, + readonly pointData: PointData[], +} + +export interface HistogramMetricData extends BaseMetricData { + readonly pointDataType: PointDataType.HISTOGRAM, + readonly pointData: PointData[], +} + +export type MetricData = SingularMetricData | HistogramMetricData; + +export enum PointDataType { + SINGULAR, + HISTOGRAM, + EXPONENTIAL_HISTOGRAM, +} + +export interface PointData { + /** + * The start epoch timestamp of the PointData, usually the time when + * the metric was created or an aggregation was enabled. + */ + readonly startTime: HrTime; + /** + * The end epoch timestamp when data were collected, usually it represents the moment + * when `Meter.collectAndReset` was called. + */ + readonly endTime: HrTime; + /** + * The attributes associated with this PointData. + */ + readonly attributes: Attributes; + /** + * The data {@link PointData}s for this metric, or empty {@code Collection} if no points. + */ + readonly point: T; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricStorage.ts new file mode 100644 index 00000000000..3947c3cca5b --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricStorage.ts @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Context } from '@opentelemetry/api'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import { hashAttributes, Maybe } from '../utils'; +import { Accumulation, AccumulationRecord, Aggregator } from '../aggregator/types'; + +/** + * Allows synchronous collection of metrics. + * + * This storage should allow allocation of new aggregation cells for metrics and unique reporting + * of per-collection delta accumulations. + */ +export class DeltaMetricStorage> { + private _activeCollectionStorage = new Map>(); + + constructor(private _aggregator: Aggregator) {} + + /** Bind an efficient storage handle for a set of attributes. */ + private bind(attributes: Attributes) { + const hash = hashAttributes(attributes); + let accumulationRecord: AccumulationRecord; + if (this._activeCollectionStorage.has(hash)) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + accumulationRecord = this._activeCollectionStorage.get(hash)!; + } else { + accumulationRecord = [attributes, this._aggregator.createAccumulation()] as AccumulationRecord; + this._activeCollectionStorage.set(hash, accumulationRecord); + } + return accumulationRecord[1]; + } + + record(value: number, attributes: Attributes, _context: Context) { + const accumulation = this.bind(attributes); + accumulation?.record(value); + } + + collect() { + const unreportedDelta = this._activeCollectionStorage; + this._activeCollectionStorage = new Map(); + return unreportedDelta; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts index 57a2b022b0e..fe8e287a23f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import { hrTime } from '@opentelemetry/core'; import { Resource } from '@opentelemetry/resources'; import { ViewRegistry } from '../view/ViewRegistry'; @@ -22,6 +23,10 @@ import { ViewRegistry } from '../view/ViewRegistry'; */ export class MeterProviderSharedState { viewRegistry = new ViewRegistry(); + // TODO: we should probably Object.freeze here but, + // return type Object.freeze(hrTime()) is `readonly [number, number]` which + // is not assignable to HrTime. + readonly sdkStartTime = hrTime(); constructor(public resource: Resource) {} } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts new file mode 100644 index 00000000000..acabf9e582a --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { CollectorInfo } from '../export/CollectorInfo'; +import { MetricData } from '../export/MetricData'; +import { Maybe } from '../utils'; + +export interface MetricStorage { + /** + * Collects the metrics from this storage and resets for the next + * collection period. + * + * Note: This is a stateful operation and will reset any interval-related + * state for the CollectorInfo. + */ + collectAndReset( + collectorInfo: CollectorInfo, + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + sdkStartTime: HrTime, + collectionTime: HrTime, + ): Promise>; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts index 43c064828fd..3dc76048dc9 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts @@ -15,7 +15,7 @@ */ import { Context } from '@opentelemetry/api'; -import { Attributes } from '@opentelemetry/api-metrics'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; import { WritableMetricStorage } from './WritableMetricStorage'; export class MultiMetricStorage implements WritableMetricStorage { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts new file mode 100644 index 00000000000..382f6c19c98 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Context, HrTime } from '@opentelemetry/api'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import { WritableMetricStorage } from './WritableMetricStorage'; +import { Accumulation, Aggregator } from '../aggregator/types'; +import { View } from '../view/View'; +import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor'; +import { AttributesProcessor } from '../view/AttributesProcessor'; +import { MetricStorage } from './MetricStorage'; +import { CollectorInfo } from '../export/CollectorInfo'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { MetricData } from '../export/MetricData'; +import { DeltaMetricStorage } from './DeltaMetricStorage'; +import { TemporalMetricStorage } from './TemporalMetricStorage'; +import { Maybe } from '../utils'; + +/** + * Stores and aggregate {@link MetricData} for synchronous instruments. + */ +export class SyncMetricStorage> implements WritableMetricStorage, MetricStorage { + private _deltaMetricStorage: DeltaMetricStorage; + private _temporalMetricStorage: TemporalMetricStorage; + + constructor(private _instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, private _attributesProcessor: AttributesProcessor) { + this._deltaMetricStorage = new DeltaMetricStorage(aggregator); + this._temporalMetricStorage = new TemporalMetricStorage(aggregator); + } + + record(value: number, attributes: Attributes, context: Context) { + attributes = this._attributesProcessor.process(attributes, context); + this._deltaMetricStorage.record(value, attributes, context); + } + + /** + * Collects the metrics from this storage and resets for the next + * collection period. + * + * Note: This is a stateful operation and will reset any interval-related + * state for the CollectorInfo. + */ + async collectAndReset( + collectorInfo: CollectorInfo, + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + sdkStartTime: HrTime, + collectionTime: HrTime, + ): Promise> { + const accumulations = this._deltaMetricStorage.collect(); + + return this._temporalMetricStorage.buildMetrics(collectorInfo, resource, instrumentationLibrary, this._instrumentDescriptor, accumulations, sdkStartTime, collectionTime); + } + + static create(view: View, instrument: InstrumentDescriptor): SyncMetricStorage> { + instrument = createInstrumentDescriptorWithView(view, instrument); + const aggregator = view.aggregation.createAggregator(instrument); + const storage = new SyncMetricStorage(instrument, aggregator, view.attributesProcessor); + return storage; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricStorage.ts new file mode 100644 index 00000000000..28b0cfc062c --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricStorage.ts @@ -0,0 +1,128 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { Accumulation, AccumulationRecord, Aggregator } from '../aggregator/types'; +import { CollectorHandle, CollectorInfo } from '../export/CollectorInfo'; +import { MetricData } from '../export/MetricData'; +import { Resource } from '@opentelemetry/resources'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { Maybe } from '../utils'; + +/** + * Remembers what was presented to a specific exporter. + */ +interface LastReportedHistory { + /** + * The last accumulation of metric data. + */ + accumulations: Map>; + /** + * The timestamp the data was reported. + */ + collectionTime: HrTime; +} + +/** + * Stores last reported time and (optional) accumulation for metrics. + * + * Allows synchronous collection of metrics and reports delta values isolated by collection handle. + */ +export class TemporalMetricStorage> { + private _reportHistory = new Map>(); + + constructor(private _aggregator: Aggregator) {} + + /** + * Builds the {@link MetricData} streams to report against a specific metric reader. + * @param collectorInfo The information of the metric reader. + * @param resource The resource to attach these metrics against. + * @param instrumentationLibrary The instrumentation library that generated these metrics. + * @param instrumentDescriptor The instrumentation descriptor that these metrics generated with. + * @param currentAccumulation The current accumulation of metric data from instruments. + * @param sdkStartTime The sdk start timestamp. + * @param collectionTime The current collection timestamp. + * @returns The {@link MetricData} points or {@code null}. + */ + buildMetrics( + collectorInfo: CollectorInfo, + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + instrumentDescriptor: InstrumentDescriptor, + currentAccumulation: Map>, + sdkStartTime: HrTime, + collectionTime: HrTime, + ): Maybe { + const { aggregationTemporality, collectorHandle } = collectorInfo; + // In case it's our first collection, default to start timestamp (see below for explanation). + let lastCollectionTime = sdkStartTime; + + let result = currentAccumulation; + // Check our last report time. + if (this._reportHistory.has(collectorHandle)) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const last = this._reportHistory.get(collectorHandle)!; + lastCollectionTime = last.collectionTime; + // Use aggregation temporality + instrument to determine if we do a merge or a diff of + // previous. We have the following four scenarios: + // 1. Cumulative Aggregation (temporality) + Delta recording (sync instrument). + // Here we merge with our last record to get a cumulative aggregation. + // 2. Cumulative Aggregation + Cumulative recording - do nothing + // 3. Delta Aggregation + Delta recording - do nothing. + // 4. Delta Aggregation + Cumulative recording (async instrument) - do nothing + if (aggregationTemporality === AggregationTemporality.CUMULATIVE) { + // We need to make sure the current delta recording gets merged into the previous cumulative + // for the next cumulative measurement. + TemporalMetricStorage.mergeInPlace(last.accumulations, currentAccumulation, this._aggregator); + result = last.accumulations; + } + } + + // Update last reported (cumulative) accumulation. + this._reportHistory.set(collectorHandle, { + accumulations: result, + collectionTime, + }); + + // Metric data time span is determined in Aggregator.toMetricData with aggregation temporality: + // 1. Cumulative Aggregation time span: (sdkStartTime, collectionTime] + // 2. Delta Aggregation time span: (lastCollectionTime, collectionTime] + return this._aggregator.toMetricData( + resource, + instrumentationLibrary, + instrumentDescriptor, + Array.from(result.values()), + aggregationTemporality, + sdkStartTime, + lastCollectionTime, + collectionTime); + } + + static mergeInPlace(last: Map>, current: Map>, aggregator: Aggregator) { + // TODO: Array.from for https://www.typescriptlang.org/tsconfig#downlevelIteration + for (const [key, record] of Array.from(last.entries())) { + if (!current.has(key)) { + last.delete(key); + continue; + } + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const currentRecord = current.get(key)!; + last.set(key, [ record[0], aggregator.merge(record[1], currentRecord[1])]); + } + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts index 147cc216546..4233dedd243 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts @@ -15,9 +15,15 @@ */ import { Context } from '@opentelemetry/api'; -import { Attributes } from '@opentelemetry/api-metrics'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +/** + * Internal interface. + * + * Stores {@link MetricData} and allows synchronous writes of measurements. + */ export interface WritableMetricStorage { + /** Records a measurement. */ record(value: number, attributes: Attributes, context: Context): void; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts new file mode 100644 index 00000000000..1433d2b5663 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Attributes } from '@opentelemetry/api-metrics-wip'; + +export type Maybe = T | undefined; + + +/** + * Converting the unordered attributes into unique identifier string. + * @param attributes user provided unordered Attributes. + */ +export function hashAttributes(attributes: Attributes): string { + let keys = Object.keys(attributes); + if (keys.length === 0) return ''; + + keys = keys.sort(); + return keys.reduce((result, key) => { + if (result.length > 2) { + result += ','; + } + return (result += key + ':' + attributes[key]); + }, '|#'); +} + +export function isNotNullish(item: Maybe): item is T { + return item !== undefined && item !== null; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts index 41bb6ec73c9..110b21fbf01 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts @@ -14,7 +14,11 @@ * limitations under the License. */ +import { Aggregator, SumAggregator, DropAggregator, LastValueAggregator, HistogramAggregator } from '../aggregator'; +import { Accumulation } from '../aggregator/types'; import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { InstrumentType } from '../Instruments'; +import { Maybe } from '../utils'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#aggregation @@ -24,19 +28,91 @@ import { InstrumentDescriptor } from '../InstrumentDescriptor'; * Aggregation provides a set of built-in aggregations via static methods. */ export abstract class Aggregation { - // TODO: define the actual aggregator classes - abstract createAggregator(instrument: InstrumentDescriptor): unknown; + abstract createAggregator(instrument: InstrumentDescriptor): Aggregator>; - static None(): Aggregation { - return NONE_AGGREGATION; + static Drop(): Aggregation { + return DROP_AGGREGATION; + } + + static Sum(): Aggregation { + return SUM_AGGREGATION; + } + + static LastValue(): Aggregation { + return LAST_VALUE_AGGREGATION; + } + + static Histogram(): Aggregation { + return HISTOGRAM_AGGREGATION; + } + + static Default(): Aggregation { + return DEFAULT_AGGREGATION; } } -export class NoneAggregation extends Aggregation { +export class DropAggregation extends Aggregation { + static kDefault = new DropAggregator(); createAggregator(_instrument: InstrumentDescriptor) { - // TODO: define aggregator type - return; + return DropAggregation.kDefault; + } +} + +export class SumAggregation extends Aggregation { + static kDefault = new SumAggregator(); + createAggregator(_instrument: InstrumentDescriptor) { + return SumAggregation.kDefault; + } +} + +export class LastValueAggregation extends Aggregation { + static kDefault = new LastValueAggregator(); + createAggregator(_instrument: InstrumentDescriptor) { + return LastValueAggregation.kDefault; + } +} + +export class HistogramAggregation extends Aggregation { + static kDefault = new HistogramAggregator([0, 5, 10, 25, 50, 75, 100, 250, 500, 1000]); + createAggregator(_instrument: InstrumentDescriptor) { + return HistogramAggregation.kDefault; + } +} + +export class ExplicitBucketHistogramAggregation extends Aggregation { + constructor(private _boundaries: number[]) { + super(); + } + + createAggregator(_instrument: InstrumentDescriptor) { + return new HistogramAggregator(this._boundaries); + } +} + +// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#default-aggregation +export class DefaultAggregation extends Aggregation { + createAggregator(instrument: InstrumentDescriptor): Aggregator> { + // cast to unknown to disable complaints on the (unreachable) fallback. + switch (instrument.type as unknown) { + case InstrumentType.COUNTER: + case InstrumentType.UP_DOWN_COUNTER: + case InstrumentType.OBSERVABLE_COUNTER: + case InstrumentType.OBSERVABLE_UP_DOWN_COUNTER: { + return SumAggregation.kDefault; + } + case InstrumentType.OBSERVABLE_GAUGE: { + return LastValueAggregation.kDefault; + } + case InstrumentType.HISTOGRAM: { + return HistogramAggregation.kDefault; + } + } + return DropAggregation.kDefault; } } -const NONE_AGGREGATION = new NoneAggregation(); +const DROP_AGGREGATION = new DropAggregation(); +const SUM_AGGREGATION = new SumAggregation(); +const LAST_VALUE_AGGREGATION = new LastValueAggregation(); +const HISTOGRAM_AGGREGATION = new HistogramAggregation(); +const DEFAULT_AGGREGATION = new DefaultAggregation(); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/AttributesProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/AttributesProcessor.ts index 5cdfa2dda44..15b46f53e0d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/AttributesProcessor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/AttributesProcessor.ts @@ -15,7 +15,7 @@ */ import { Context } from '@opentelemetry/api'; -import { Attributes } from '@opentelemetry/api-metrics'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; /** * The {@link AttributesProcessor} is responsible for customizing which diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/View.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/View.ts index 9dfbcb07ed9..179b5807efd 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/View.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/View.ts @@ -60,9 +60,7 @@ export class View { constructor(config?: ViewStreamConfig) { this.name = config?.name; this.description = config?.description; - // TODO: the default aggregation should be Aggregation.Default(). - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#default-aggregation - this.aggregation = config?.aggregation ?? Aggregation.None(); + this.aggregation = config?.aggregation ?? Aggregation.Default(); this.attributesProcessor = config?.attributesProcessor ?? AttributesProcessor.Noop(); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts new file mode 100644 index 00000000000..7ee39f99f0c --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import * as assert from 'assert'; +import { DropAggregator } from '../../src/aggregator'; +import { defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +describe('DropAggregator', () => { + describe('createAccumulation', () => { + it('no exceptions', () => { + const aggregator = new DropAggregator(); + const accumulation = aggregator.createAccumulation(); + assert.strictEqual(accumulation, undefined); + }); + }); + + describe('merge', () => { + it('no exceptions', () => { + const aggregator = new DropAggregator(); + const prev = aggregator.createAccumulation(); + const delta = aggregator.createAccumulation(); + assert.strictEqual(aggregator.merge(prev, delta), undefined); + }); + }); + + describe('diff', () => { + it('no exceptions', () => { + const aggregator = new DropAggregator(); + const prev = aggregator.createAccumulation(); + const curr = aggregator.createAccumulation(); + assert.strictEqual(aggregator.diff(prev, curr), undefined); + }); + }); + + describe('toMetricData', () => { + it('no exceptions', () => { + const aggregator = new DropAggregator(); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + assert.strictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, undefined]], + AggregationTemporality.DELTA, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), undefined); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts new file mode 100644 index 00000000000..ef24b910fc5 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts @@ -0,0 +1,185 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import * as assert from 'assert'; +import { HistogramAccumulation, HistogramAggregator } from '../../src/aggregator'; +import { MetricData, PointDataType } from '../../src/export/MetricData'; +import { commonValues, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +describe('HistogramAggregator', () => { + describe('createAccumulation', () => { + it('no exceptions on createAccumulation', () => { + const aggregator = new HistogramAggregator([1, 10, 100]); + const accumulation = aggregator.createAccumulation(); + assert(accumulation instanceof HistogramAccumulation); + }); + }); + + describe('merge', () => { + it('no exceptions', () => { + const aggregator = new HistogramAggregator([1, 10, 100]); + const prev = aggregator.createAccumulation(); + prev.record(0); + prev.record(1); + + const delta = aggregator.createAccumulation(); + delta.record(2); + delta.record(11); + + const expected = aggregator.createAccumulation(); + // replay actions on prev + expected.record(0); + expected.record(1); + // replay actions on delta + expected.record(2); + expected.record(11); + + assert.deepStrictEqual(aggregator.merge(prev, delta), expected); + }); + }); + + describe('diff', () => { + it('no exceptions', () => { + const aggregator = new HistogramAggregator([1, 10, 100]); + const prev = aggregator.createAccumulation(); + prev.record(0); + prev.record(1); + + const curr = aggregator.createAccumulation(); + // replay actions on prev + curr.record(0); + curr.record(1); + // perform new actions + curr.record(2); + curr.record(11); + + const expected = new HistogramAccumulation([1, 10, 100], { + buckets: { + boundaries: [1, 10, 100], + counts: [0, 1, 1, 0], + }, + count: 2, + sum: 13, + }); + + assert.deepStrictEqual(aggregator.diff(prev, curr), expected); + }); + }); + + describe('toMetricData', () => { + it('transform with AggregationTemporality.DELTA', () => { + const aggregator = new HistogramAggregator([1, 10, 100]); + + const accumulation = aggregator.createAccumulation(); + accumulation.record(0); + accumulation.record(1); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + const expected: MetricData = { + resource: defaultResource, + instrumentationLibrary: defaultInstrumentationLibrary, + instrumentDescriptor: defaultInstrumentDescriptor, + pointDataType: PointDataType.HISTOGRAM, + pointData: [ + { + attributes: {}, + startTime: lastCollectionTime, + endTime: collectionTime, + point: { + buckets: { + boundaries: [1, 10, 100], + counts: [1, 1, 0, 0], + }, + count: 2, + sum: 1, + }, + }, + ], + }; + assert.deepStrictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, accumulation]], + AggregationTemporality.DELTA, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), expected); + }); + + it('transform with AggregationTemporality.CUMULATIVE', () => { + const aggregator = new HistogramAggregator([1, 10, 100]); + + const accumulation = aggregator.createAccumulation(); + accumulation.record(0); + accumulation.record(1); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + const expected: MetricData = { + resource: defaultResource, + instrumentationLibrary: defaultInstrumentationLibrary, + instrumentDescriptor: defaultInstrumentDescriptor, + pointDataType: PointDataType.HISTOGRAM, + pointData: [ + { + attributes: {}, + startTime: sdkStartTime, + endTime: collectionTime, + point: { + buckets: { + boundaries: [1, 10, 100], + counts: [1, 1, 0, 0], + }, + count: 2, + sum: 1, + }, + }, + ], + }; + assert.deepStrictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, accumulation]], + AggregationTemporality.CUMULATIVE, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), expected); + }); + }); +}); + +describe('HistogramAccumulation', () => { + describe('record', () => { + it('no exceptions on record', () => { + const accumulation = new HistogramAccumulation([1, 10, 100]); + + for (const value of commonValues) { + accumulation.record(value); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts new file mode 100644 index 00000000000..9456718ffce --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts @@ -0,0 +1,148 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import * as assert from 'assert'; +import { LastValueAccumulation, LastValueAggregator } from '../../src/aggregator'; +import { MetricData, PointDataType } from '../../src/export/MetricData'; +import { commonValues, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +describe('LastValueAggregator', () => { + describe('createAccumulation', () => { + it('no exceptions on createAccumulation', () => { + const aggregator = new LastValueAggregator(); + const accumulation = aggregator.createAccumulation(); + assert(accumulation instanceof LastValueAccumulation); + }); + }); + + describe('merge', () => { + it('no exceptions', () => { + const aggregator = new LastValueAggregator(); + const prev = aggregator.createAccumulation(); + prev.record(2); + + const delta = aggregator.createAccumulation(); + delta.record(3); + + assert.deepStrictEqual(aggregator.merge(prev, delta), delta); + }); + }); + + describe('diff', () => { + it('no exceptions', () => { + const aggregator = new LastValueAggregator(); + const prev = aggregator.createAccumulation(); + prev.record(2); + + const curr = aggregator.createAccumulation(); + curr.record(3); + + assert.deepStrictEqual(aggregator.diff(prev, curr), curr); + }); + }); + + describe('toMetricData', () => { + it('transform with AggregationTemporality.DELTA', () => { + const aggregator = new LastValueAggregator(); + + const accumulation = aggregator.createAccumulation(); + accumulation.record(1); + accumulation.record(2); + accumulation.record(1); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + const expected: MetricData = { + resource: defaultResource, + instrumentationLibrary: defaultInstrumentationLibrary, + instrumentDescriptor: defaultInstrumentDescriptor, + pointDataType: PointDataType.SINGULAR, + pointData: [ + { + attributes: {}, + startTime: lastCollectionTime, + endTime: collectionTime, + point: 1, + }, + ], + }; + assert.deepStrictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, accumulation]], + AggregationTemporality.DELTA, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), expected); + }); + + it('transform with AggregationTemporality.CUMULATIVE', () => { + const aggregator = new LastValueAggregator(); + + const accumulation = aggregator.createAccumulation(); + accumulation.record(1); + accumulation.record(2); + accumulation.record(1); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + const expected: MetricData = { + resource: defaultResource, + instrumentationLibrary: defaultInstrumentationLibrary, + instrumentDescriptor: defaultInstrumentDescriptor, + pointDataType: PointDataType.SINGULAR, + pointData: [ + { + attributes: {}, + startTime: sdkStartTime, + endTime: collectionTime, + point: 1, + }, + ], + }; + assert.deepStrictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, accumulation]], + AggregationTemporality.CUMULATIVE, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), expected); + }); + }); +}); + +describe('LastValueAccumulation', () => { + describe('record', () => { + it('no exceptions on record', () => { + const accumulation = new LastValueAccumulation(); + + for (const value of commonValues) { + accumulation.record(value); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts new file mode 100644 index 00000000000..ea37e425c22 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts @@ -0,0 +1,156 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import * as assert from 'assert'; +import { SumAccumulation, SumAggregator } from '../../src/aggregator'; +import { MetricData, PointDataType } from '../../src/export/MetricData'; +import { commonValues, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +describe('SumAggregator', () => { + describe('createAccumulation', () => { + it('no exceptions on createAccumulation', () => { + const aggregator = new SumAggregator(); + const accumulation = aggregator.createAccumulation(); + assert(accumulation instanceof SumAccumulation); + }); + }); + + describe('merge', () => { + it('no exceptions', () => { + const aggregator = new SumAggregator(); + const prev = aggregator.createAccumulation(); + prev.record(1); + prev.record(2); + + const delta = aggregator.createAccumulation(); + delta.record(3); + delta.record(4); + + const expected = aggregator.createAccumulation(); + expected.record(1 + 2 + 3 + 4); + assert.deepStrictEqual(aggregator.merge(prev, delta), expected); + }); + }); + + describe('diff', () => { + it('no exceptions', () => { + const aggregator = new SumAggregator(); + const prev = aggregator.createAccumulation(); + prev.record(1); + prev.record(2); + + const curr = aggregator.createAccumulation(); + // replay actions performed on prev + curr.record(1); + curr.record(2); + // perform new actions + curr.record(3); + curr.record(4); + + const expected = aggregator.createAccumulation(); + expected.record(3 + 4); + assert.deepStrictEqual(aggregator.diff(prev, curr), expected); + }); + }); + + describe('toMetricData', () => { + it('transform with AggregationTemporality.DELTA', () => { + const aggregator = new SumAggregator(); + const accumulation = aggregator.createAccumulation(); + accumulation.record(1); + accumulation.record(2); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + const expected: MetricData = { + resource: defaultResource, + instrumentationLibrary: defaultInstrumentationLibrary, + instrumentDescriptor: defaultInstrumentDescriptor, + pointDataType: PointDataType.SINGULAR, + pointData: [ + { + attributes: {}, + startTime: lastCollectionTime, + endTime: collectionTime, + point: 3, + }, + ], + }; + assert.deepStrictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, accumulation]], + AggregationTemporality.DELTA, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), expected); + }); + + it('transform with AggregationTemporality.CUMULATIVE', () => { + const aggregator = new SumAggregator(); + const accumulation = aggregator.createAccumulation(); + accumulation.record(1); + accumulation.record(2); + + const sdkStartTime: HrTime = [0, 0]; + const lastCollectionTime: HrTime = [1, 1]; + const collectionTime: HrTime = [2, 2]; + + const expected: MetricData = { + resource: defaultResource, + instrumentationLibrary: defaultInstrumentationLibrary, + instrumentDescriptor: defaultInstrumentDescriptor, + pointDataType: PointDataType.SINGULAR, + pointData: [ + { + attributes: {}, + startTime: sdkStartTime, + endTime: collectionTime, + point: 3, + }, + ], + }; + assert.deepStrictEqual(aggregator.toMetricData( + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + [[{}, accumulation]], + AggregationTemporality.CUMULATIVE, + sdkStartTime, + lastCollectionTime, + collectionTime, + ), expected); + }); + }); +}); + +describe('SumAccumulation', () => { + describe('record', () => { + it('no exceptions on record', () => { + const accumulation = new SumAccumulation(); + + for (const value of commonValues) { + accumulation.record(value); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricStorage.test.ts new file mode 100644 index 00000000000..400b3dc3edc --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricStorage.test.ts @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { SumAggregator } from '../../src/aggregator'; +import { DeltaMetricStorage } from '../../src/state/DeltaMetricStorage'; +import { commonAttributes, commonValues } from '../util'; + +describe('DeltaMetricStorage', () => { + describe('record', () => { + it('no exceptions on record', () => { + const metricStorage = new DeltaMetricStorage(new SumAggregator()); + + for (const value of commonValues) { + for (const attributes of commonAttributes) { + metricStorage.record(value, attributes, api.context.active()); + } + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts new file mode 100644 index 00000000000..64c8daaa6f5 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import * as assert from 'assert'; +import { Measurement } from '../../src/Measurement'; +import { MultiMetricStorage } from '../../src/state/MultiWritableMetricStorage'; +import { WritableMetricStorage } from '../../src/state/WritableMetricStorage'; +import { assertMeasurementEqual, commonAttributes, commonValues } from '../util'; + +describe('MultiMetricStorage', () => { + describe('record', () => { + it('no exceptions on record', () => { + const metricStorage = new MultiMetricStorage([]); + + for (const value of commonValues) { + for (const attribute of commonAttributes) { + metricStorage.record(value, attribute, api.context.active()); + } + } + }); + + it('record with multiple backing storages', () => { + class TestWritableMetricStorage implements WritableMetricStorage { + records: Measurement[] = []; + record(value: number, attributes: Attributes, context: api.Context): void { + this.records.push({ value, attributes, context }); + } + } + + const backingStorage1 = new TestWritableMetricStorage(); + const backingStorage2 = new TestWritableMetricStorage(); + const metricStorage = new MultiMetricStorage([backingStorage1, backingStorage2]); + + const expectedMeasurements: Measurement[] = []; + for (const value of commonValues) { + for (const attributes of commonAttributes) { + const context = api.context.active() + expectedMeasurements.push({ value, attributes, context }) + metricStorage.record(value, attributes, context); + } + } + + assert.strictEqual(backingStorage1.records.length, expectedMeasurements.length); + assert.strictEqual(backingStorage2.records.length, expectedMeasurements.length); + for (const [idx, expected] of expectedMeasurements.entries()) { + assertMeasurementEqual(backingStorage1.records[idx], expected); + assertMeasurementEqual(backingStorage2.records[idx], expected); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts new file mode 100644 index 00000000000..078f2c26130 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts @@ -0,0 +1,98 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { hrTime } from '@opentelemetry/core'; +import * as assert from 'assert'; + +import { SumAggregator } from '../../src/aggregator'; +import { CollectorInfo } from '../../src/export/CollectorInfo'; +import { SyncMetricStorage } from '../../src/state/SyncMetricStorage'; +import { isNotNullish } from '../../src/utils'; +import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor'; +import { assertMetricData, commonAttributes, commonValues, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +const collectorInfo1: CollectorInfo = { + aggregationTemporality: AggregationTemporality.DELTA, + collectorHandle: Symbol(), +} + +const sdkStartTime = hrTime(); + +describe('SyncMetricStorage', () => { + describe('record', () => { + it('no exceptions on record', () => { + const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); + + for (const value of commonValues) { + for (const attributes of commonAttributes) { + metricStorage.record(value, attributes, api.context.active()); + } + } + }); + }); + + describe('collectAndReset', () => { + it('should collect and reset memos', async () => { + const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); + metricStorage.record(1, {}, api.context.active()); + metricStorage.record(2, {}, api.context.active()); + metricStorage.record(3, {}, api.context.active()); + { + const metric = await metricStorage.collectAndReset( + collectorInfo1, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + assert(isNotNullish(metric)); + assertMetricData(metric, defaultResource, defaultInstrumentationLibrary, defaultInstrumentDescriptor); + assert.strictEqual(metric.pointData.length, 1); + assert.deepStrictEqual(metric.pointData[0].attributes, {}); + assert.strictEqual(metric.pointData[0].point, 6); + } + + // The attributes should not be memorized. + { + const metric = await metricStorage.collectAndReset( + collectorInfo1, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + assert(isNotNullish(metric)); + assertMetricData(metric, defaultResource, defaultInstrumentationLibrary, defaultInstrumentDescriptor); + assert.strictEqual(metric.pointData.length, 0); + } + + metricStorage.record(1, {}, api.context.active()); + { + const metric = await metricStorage.collectAndReset( + collectorInfo1, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + assert(isNotNullish(metric)); + assertMetricData(metric, defaultResource, defaultInstrumentationLibrary, defaultInstrumentDescriptor); + assert.strictEqual(metric.pointData.length, 1); + assert.deepStrictEqual(metric.pointData[0].attributes, {}); + assert.strictEqual(metric.pointData[0].point, 1); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricStorage.test.ts new file mode 100644 index 00000000000..990bc3641cc --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricStorage.test.ts @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { AggregationTemporality } from '@opentelemetry/api-metrics-wip'; +import { hrTime } from '@opentelemetry/core'; +import * as assert from 'assert'; +import { SumAggregator } from '../../src/aggregator'; +import { CollectorInfo } from '../../src/export/CollectorInfo'; +import { PointDataType } from '../../src/export/MetricData'; +import { DeltaMetricStorage } from '../../src/state/DeltaMetricStorage'; +import { TemporalMetricStorage } from '../../src/state/TemporalMetricStorage'; +import { isNotNullish } from '../../src/utils'; +import { assertMetricData, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +const deltaCollector1: CollectorInfo = { + aggregationTemporality: AggregationTemporality.DELTA, + collectorHandle: Symbol(), +}; +const sdkStartTime = hrTime(); + +describe('TemporalMetricStorage', () => { + describe('buildMetrics', () => { + it('should build metrics', () => { + const aggregator = new SumAggregator(); + const deltaMetricStorage = new DeltaMetricStorage(aggregator); + const temporalMetricStorage = new TemporalMetricStorage(aggregator); + + deltaMetricStorage.record(1, {}, api.context.active()); + const metric = temporalMetricStorage.buildMetrics( + deltaCollector1, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assert(isNotNullish(metric)); + assertMetricData(metric, defaultResource, defaultInstrumentationLibrary, defaultInstrumentDescriptor); + assert.strictEqual(metric.pointDataType, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assert.deepStrictEqual(metric.pointData[0].attributes, {}); + assert.strictEqual(metric.pointData[0].point, 1); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts new file mode 100644 index 00000000000..2145c5003e9 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Attributes, ValueType } from '@opentelemetry/api-metrics-wip'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import * as assert from 'assert'; +import { MetricData } from '../src/export/MetricData'; +import { InstrumentDescriptor } from '../src/InstrumentDescriptor'; +import { InstrumentType } from '../src/Instruments'; +import { Measurement } from '../src/Measurement'; + +export const defaultResource = new Resource({ + resourceKey: 'my-resource', +}); + +export const defaultInstrumentDescriptor: InstrumentDescriptor = { + name: 'default_metric', + description: 'a simple instrument', + type: InstrumentType.COUNTER, + unit: '1', + valueType: ValueType.DOUBLE, +}; + +export const defaultInstrumentationLibrary: InstrumentationLibrary = { + name: 'default', + version: '1.0.0', + schemaUrl: 'https://opentelemetry.io/schemas/1.7.0' +}; + +export const commonValues: number[] = [1, -1, 1.0, Infinity, -Infinity, NaN]; +export const commonAttributes: Attributes[] = [{}, {1: '1'}, {a: '2'}, new (class Foo{ +a = '1' +})]; + +export function assertMetricData(actual: unknown, resource: Resource, instrumentationLibrary: InstrumentationLibrary, instrumentDescriptor: InstrumentDescriptor) { + assert.deepStrictEqual((actual as MetricData).resource, resource); + assert.deepStrictEqual((actual as MetricData).instrumentationLibrary, instrumentationLibrary); + assert.deepStrictEqual((actual as MetricData).instrumentDescriptor, instrumentDescriptor); +} + +export function assertMeasurementEqual(actual: unknown, expected: Measurement): asserts actual is Measurement { + // NOTE: Node.js v8 assert.strictEquals treat two NaN as different values. + if (Number.isNaN(expected.value)) { + assert(Number.isNaN((actual as Measurement).value)); + } else { + assert.strictEqual((actual as Measurement).value, expected.value); + } + assert.deepStrictEqual((actual as Measurement).attributes, expected.attributes); + assert.deepStrictEqual((actual as Measurement).context, expected.context); +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/view/Aggregation.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/view/Aggregation.test.ts new file mode 100644 index 00000000000..c6ec4503979 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/view/Aggregation.test.ts @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { Aggregator, HistogramAggregator, LastValueAggregator, SumAggregator } from '../../src/aggregator'; +import { InstrumentDescriptor } from '../../src/InstrumentDescriptor'; +import { InstrumentType } from '../../src/Instruments'; +import { Aggregation, DefaultAggregation, DropAggregation, HistogramAggregation, LastValueAggregation, SumAggregation } from '../../src/view/Aggregation'; +import { defaultInstrumentDescriptor } from '../util'; + +interface AggregationConstructor { + new(...args: any[]): Aggregation; +} + +interface AggregatorConstructor { + new(...args: any[]): Aggregator; +} + +describe('Aggregation', () => { + it('static aggregations', () => { + const staticMembers: [keyof typeof Aggregation, AggregationConstructor][] = [ + ['Drop', DropAggregation], + ['Sum', SumAggregation], + ['LastValue', LastValueAggregation], + ['Histogram', HistogramAggregation], + ['Default', DefaultAggregation], + ]; + + for (const [key, type] of staticMembers) { + const aggregation = (Aggregation[key] as () => Aggregation)(); + assert(aggregation instanceof type); + assert(aggregation.createAggregator(defaultInstrumentDescriptor)); + } + }); +}); + +describe('DefaultAggregation', () => { + describe('createAggregator', () => { + it('should create aggregators for instrument descriptors', () => { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#default-aggregation + const expectations: [InstrumentDescriptor, AggregatorConstructor][] = [ + [{ ...defaultInstrumentDescriptor, type: InstrumentType.COUNTER }, SumAggregator], + [{ ...defaultInstrumentDescriptor, type: InstrumentType.OBSERVABLE_COUNTER }, SumAggregator], + [{ ...defaultInstrumentDescriptor, type: InstrumentType.UP_DOWN_COUNTER }, SumAggregator], + [{ ...defaultInstrumentDescriptor, type: InstrumentType.OBSERVABLE_UP_DOWN_COUNTER }, SumAggregator], + [{ ...defaultInstrumentDescriptor, type: InstrumentType.OBSERVABLE_GAUGE }, LastValueAggregator], + [{ ...defaultInstrumentDescriptor, type: InstrumentType.HISTOGRAM }, HistogramAggregator], + ]; + + const aggregation = new DefaultAggregation(); + for (const [instrumentDescriptor, type] of expectations) { + assert(aggregation.createAggregator(instrumentDescriptor) instanceof type, `${InstrumentType[instrumentDescriptor.type]}`); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/view/View.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/view/View.test.ts index 11e3658744e..99520d9933f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/view/View.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/view/View.test.ts @@ -25,7 +25,7 @@ describe('View', () => { const view = new View(); assert.strictEqual(view.name, undefined); assert.strictEqual(view.description, undefined); - assert.strictEqual(view.aggregation, Aggregation.None()); + assert.strictEqual(view.aggregation, Aggregation.Default()); assert.strictEqual(view.attributesProcessor, AttributesProcessor.Noop()); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/view/ViewRegistry.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/view/ViewRegistry.test.ts index 9aae44dea2e..e7185a18887 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/view/ViewRegistry.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/view/ViewRegistry.test.ts @@ -15,28 +15,13 @@ */ import * as assert from 'assert'; -import { ValueType } from '@opentelemetry/api-metrics'; -import { InstrumentationLibrary } from '@opentelemetry/core'; import { InstrumentType } from '../../src/Instruments'; import { ViewRegistry } from '../../src/view/ViewRegistry'; import { View } from '../../src/view/View'; import { InstrumentSelector } from '../../src/view/InstrumentSelector'; import { MeterSelector } from '../../src/view/MeterSelector'; -import { InstrumentDescriptor } from '../../src/InstrumentDescriptor'; - -const defaultInstrumentDescriptor: InstrumentDescriptor = { - name: '', - description: '', - type: InstrumentType.COUNTER, - unit: '', - valueType: ValueType.DOUBLE, -}; - -const defaultInstrumentationLibrary: InstrumentationLibrary = { - name: 'default', - version: '1.0.0', - schemaUrl: 'https://opentelemetry.io/schemas/1.7.0' -}; +import { defaultInstrumentationLibrary, defaultInstrumentDescriptor } from '../util'; + describe('ViewRegistry', () => { describe('findViews', () => {