Skip to content

Commit

Permalink
feat: aggregation support
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Nov 22, 2021
1 parent 9b5feb2 commit 5282e9f
Show file tree
Hide file tree
Showing 37 changed files with 2,006 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ export enum ValueType {

/** The kind of aggregator. */
export enum AggregationTemporality {
AGGREGATION_TEMPORALITY_UNSPECIFIED,
AGGREGATION_TEMPORALITY_DELTA,
AGGREGATION_TEMPORALITY_CUMULATIVE,
UNSPECIFIED,
DELTA,
CUMULATIVE,
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
* limitations under the License.
*/

import { MetricOptions, ValueType } from '@opentelemetry/api-metrics';
import { MetricOptions, ValueType } from '@opentelemetry/api-metrics-wip';
import { InstrumentType } from './Instruments';
import { View } from './view/View';


export interface InstrumentDescriptor {
readonly name: string;
Expand All @@ -34,3 +36,13 @@ export function createInstrumentDescriptor(name: string, type: InstrumentType, o
valueType: options?.valueType ?? ValueType.DOUBLE,
};
}

export function createInstrumentDescriptorWithView(view: View, instrument: InstrumentDescriptor): InstrumentDescriptor {
return {
name: view.name ?? instrument.name,
description: view.description ?? instrument.description,
type: instrument.type,
unit: instrument.unit,
valueType: instrument.valueType,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentDescriptor } from './InstrumentDescriptor';
import { WritableMetricStorage } from './state/WritableMetricStorage';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import * as api from '@opentelemetry/api'
import { Attributes } from '@opentelemetry/api-metrics'
import { Attributes } from '@opentelemetry/api-metrics-wip'

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#measurement

Expand Down
30 changes: 23 additions & 7 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
* limitations under the License.
*/

import * as metrics from '@opentelemetry/api-metrics';
import { InstrumentationLibrary } from '@opentelemetry/core';
import * as metrics from '@opentelemetry/api-metrics-wip';
import { hrTime, InstrumentationLibrary } from '@opentelemetry/core';
import { createInstrumentDescriptor, InstrumentDescriptor } from './InstrumentDescriptor';
import { Counter, Histogram, InstrumentType, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { NoopWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { CollectorInfo } from './export/CollectorInfo';
import { MetricData } from './export/MetricData';
import { isNotNullish } from './utils';

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#meter

export class Meter implements metrics.Meter {
private _metricStorageRegistry = new Map<string, WritableMetricStorage>();
private _metricStorageRegistry = new Map<string, MetricStorage>();

// instrumentation library required by spec to be on meter
// spec requires provider config changes to apply to previously created meters, achieved by holding a reference to the provider
Expand Down Expand Up @@ -68,9 +72,8 @@ export class Meter implements metrics.Meter {

private _registerMetricStorage(descriptor: InstrumentDescriptor) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(_view => {
// TODO: create actual metric storages.
const storage = new NoopWritableMetricStorage();
const storages = views.map(view => {
const storage = SyncMetricStorage.create(view, descriptor);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
return storage;
Expand All @@ -80,4 +83,17 @@ export class Meter implements metrics.Meter {
}
return new MultiMetricStorage(storages);
}

async collectAll(collectorInfo: CollectorInfo): Promise<MetricData[]> {
const collectionTime = hrTime();
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
return metricStorage.collectAndReset(
collectorInfo,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));
return result.filter(isNotNullish);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import * as metrics from '@opentelemetry/api-metrics-wip';
import { Resource } from '@opentelemetry/resources';
import { Meter } from './Meter';
import { MetricExporter } from './MetricExporter';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { HrTime } from '@opentelemetry/api';
import { AggregationTemporality } from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
import {
AggregatorKind,
Aggregator,
AccumulationRecord,
} from './types';

/** Basic aggregator for None which keeps no recorded value. */
export class DropAggregator implements Aggregator<undefined> {
kind: AggregatorKind.NONE = AggregatorKind.NONE;

createAccumulation() {
return undefined;
}

merge(_previous: undefined, _delta: undefined) {
return undefined;
}

diff(_previous: undefined, _current: undefined) {
return undefined;
}

toMetricData(
_resource: Resource,
_instrumentationLibrary: InstrumentationLibrary,
_instrumentDescriptor: InstrumentDescriptor,
_accumulationByAttributes: AccumulationRecord<undefined>[],
_temporality: AggregationTemporality,
_sdkStartTime: HrTime,
_lastCollectionTime: HrTime,
_collectionTime: HrTime): Maybe<MetricData> {
return undefined;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
Accumulation,
AccumulationRecord,
Aggregator,
AggregatorKind,
} from './types';
import { Histogram, HistogramMetricData, PointDataType } from '../export/MetricData';
import { Resource } from '@opentelemetry/resources';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { AggregationTemporality } from '@opentelemetry/api-metrics-wip';
import { HrTime } from '@opentelemetry/api';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';

function createNewEmptyCheckpoint(boundaries: number[]): Histogram {
return {
buckets: {
boundaries,
counts: boundaries.map(() => 0).concat([0]),
},
sum: 0,
count: 0,
};
}

export class HistogramAccumulation implements Accumulation {
constructor(private readonly _boundaries: number[], private _current: Histogram = createNewEmptyCheckpoint(_boundaries)) {}

record(value: number): void {
this._current.count += 1;
this._current.sum += value;

for (let i = 0; i < this._boundaries.length; i++) {
if (value < this._boundaries[i]) {
this._current.buckets.counts[i] += 1;
return;
}
}
// value is above all observed boundaries
this._current.buckets.counts[this._boundaries.length] += 1;
}

toPoint(): Histogram {
return this._current;
}
}

/**
* Basic aggregator which observes events and counts them in pre-defined buckets
* and provides the total sum and count of all observations.
*/
export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
public kind: AggregatorKind.HISTOGRAM = AggregatorKind.HISTOGRAM;
private readonly _boundaries: number[];

constructor(boundaries: number[]) {
if (boundaries === undefined || boundaries.length === 0) {
throw new Error('HistogramAggregator should be created with boundaries.');
}
// we need to an ordered set to be able to correctly compute count for each
// boundary since we'll iterate on each in order.
this._boundaries = boundaries.sort((a, b) => a - b);
}

createAccumulation() {
return new HistogramAccumulation(this._boundaries);
}

/**
* Return the result of the merge of two histogram accumulations. As long as one Aggregator
* instance produces all Accumulations with constant boundaries we don't need to worry about
* merging accumulations with different boundaries.
*/
merge(previous: HistogramAccumulation, delta: HistogramAccumulation): HistogramAccumulation {
const previousPoint = previous.toPoint();
const deltaPoint = delta.toPoint();

const previousCounts = previousPoint.buckets.counts;
const deltaCounts = deltaPoint.buckets.counts;

const mergedCounts = new Array(previousCounts.length);
for (let idx = 0; idx < previousCounts.length; idx++) {
mergedCounts[idx] = previousCounts[idx] + deltaCounts[idx];
}

return new HistogramAccumulation(previousPoint.buckets.boundaries, {
buckets: {
boundaries: previousPoint.buckets.boundaries,
counts: mergedCounts,
},
count: previousPoint.count + deltaPoint.count,
sum: previousPoint.sum + deltaPoint.sum,
});
}

diff(previous: HistogramAccumulation, current: HistogramAccumulation): HistogramAccumulation {
const previousPoint = previous.toPoint();
const currentPoint = current.toPoint();

const previousCounts = previousPoint.buckets.counts;
const currentCounts = currentPoint.buckets.counts;

const diffedCounts = new Array(previousCounts.length);
for (let idx = 0; idx < previousCounts.length; idx++) {
diffedCounts[idx] = currentCounts[idx] - previousCounts[idx];
}

return new HistogramAccumulation(previousPoint.buckets.boundaries, {
buckets: {
boundaries: previousPoint.buckets.boundaries,
counts: diffedCounts,
},
count: currentPoint.count - previousPoint.count,
sum: currentPoint.sum - previousPoint.sum,
});
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
metricDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<HistogramAccumulation>[],
temporality: AggregationTemporality,
sdkStartTime: HrTime,
lastCollectionTime: HrTime,
collectionTime: HrTime): Maybe<HistogramMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor: metricDescriptor,
pointDataType: PointDataType.HISTOGRAM,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime: temporality === AggregationTemporality.CUMULATIVE ? sdkStartTime : lastCollectionTime,
endTime: collectionTime,
point: accumulation.toPoint(),
}
})
}
}
}
Loading

0 comments on commit 5282e9f

Please sign in to comment.