Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(views): handle view conflicts. #2734

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,35 @@ export function createInstrumentDescriptorWithView(view: View, instrument: Instr
valueType: instrument.valueType,
};
}

export function isDescriptorCompatibleWith(descriptor: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return descriptor.name === otherDescriptor.name
&& descriptor.description === otherDescriptor.description
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
&& descriptor.unit === otherDescriptor.unit
&& descriptor.type === otherDescriptor.type
&& descriptor.valueType === otherDescriptor.valueType;
}

export function getDescriptorIncompatibilityDetails(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
let incompatibility = '';
if (existing.description !== otherDescriptor.description) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

incompatibility += `\n- Description '${existing.description}' does not match '${otherDescriptor.description}'`;
}
if (existing.unit !== otherDescriptor.unit) {
incompatibility += `\n- Unit '${existing.unit}' does not match '${otherDescriptor.description}'`;
}
if (existing.type !== otherDescriptor.type) {
incompatibility += `\n- Type '${existing.type}' does not match '${otherDescriptor.type}'`;
}
if (existing.valueType !== otherDescriptor.valueType) {
incompatibility += `\n- Value Type '${existing.valueType}' does not match '${otherDescriptor.valueType}'`;
}

return incompatibility;
}

export function isDescriptorAsync(descriptor: InstrumentDescriptor) {
return (descriptor.type === InstrumentType.OBSERVABLE_GAUGE ||
descriptor.type === InstrumentType.OBSERVABLE_UP_DOWN_COUNTER ||
descriptor.type === InstrumentType.OBSERVABLE_COUNTER);
}
32 changes: 16 additions & 16 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@

import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { createInstrumentDescriptor, InstrumentDescriptor } from './InstrumentDescriptor';
import {
createInstrumentDescriptor, InstrumentDescriptor
} from './InstrumentDescriptor';
import { Counter, Histogram, InstrumentType, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { MetricData } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
import { HrTime } from '@opentelemetry/api';
import { WritableMetricStorage } from './state/WritableMetricStorage';
import { MetricStorageRegistry } from './state/MetricStorageRegistry';

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#meter

export class Meter implements metrics.Meter {
private _metricStorageRegistry = new Map<string, MetricStorage>();
private _metricStorageRegistry = new MetricStorageRegistry();

// instrumentation library required by spec to be on meter
// spec requires provider config changes to apply to previously created meters, achieved by holding a reference to the provider
Expand Down Expand Up @@ -89,31 +92,28 @@ export class Meter implements metrics.Meter {
this._registerAsyncMetricStorage(descriptor, callback);
}

private _registerMetricStorage(descriptor: InstrumentDescriptor) {
private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => {
const storage = SyncMetricStorage.create(view, descriptor);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
return storage;
});
if (storages.length === 1) {
const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor)))
.filter(isNotNullish);

if (storages.length === 1) {
return storages[0];
}

// This will be a no-op WritableMetricStorage when length is null.
return new MultiMetricStorage(storages);
}

private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const storage = AsyncMetricStorage.create(view, descriptor, callback);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback));
});
}

async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
const result = await Promise.all(Array.from(this._metricStorageRegistry.getStorages()).map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

getInstrumentDescriptor(): InstrumentDescriptor {
return this._instrumentDescriptor;
}

private _record(measurements: AttributeHashMap<number>) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { InstrumentDescriptor } from '../InstrumentDescriptor';

/**
* Internal interface.
Expand All @@ -41,4 +42,6 @@ export interface MetricStorage {
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;

getInstrumentDescriptor(): InstrumentDescriptor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { MetricStorage } from './MetricStorage';
import {
getDescriptorIncompatibilityDetails, isDescriptorAsync,
isDescriptorCompatibleWith
} from '../InstrumentDescriptor';
import * as api from '@opentelemetry/api';
import { Maybe } from '../utils';

/**
* Internal class for storing @{LinkMetricStorage}
*/
export class MetricStorageRegistry {
private readonly _metricStorageRegistry = new Map<string, MetricStorage>();

getStorages(): IterableIterator<MetricStorage> {
return this._metricStorageRegistry.values();
}

register<T extends MetricStorage>(storage: T): Maybe<T> {
const expectedDescriptor = storage.getInstrumentDescriptor();

// create and register a new one if it does not exist yet.
if (!this._metricStorageRegistry.has(expectedDescriptor.name)) {
this._metricStorageRegistry.set(expectedDescriptor.name, storage);
return storage;
}

// We already checked for existence with has()
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const existingStorage = this._metricStorageRegistry.get(expectedDescriptor.name)!;
const existingDescriptor = existingStorage.getInstrumentDescriptor();

// Return storage if it is compatible.
if (isDescriptorCompatibleWith(existingDescriptor, expectedDescriptor)) {
// Is compatible, but views for async instruments cannot be registered twice.
if(isDescriptorAsync(existingDescriptor)) {
api.diag.warn(`A view for an async instrument with the name '${expectedDescriptor.name}' has already been registered.`);
return undefined;
}

return (existingStorage as T);
}

// Warn and return undefined if it is incompatible.
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
api.diag.warn(`A view or instrument with the name '${expectedDescriptor.name}' has already been registered and is incompatible with another registered view.\nDetails:\n`,
getDescriptorIncompatibilityDetails(existingDescriptor, expectedDescriptor));
return undefined;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> implements Writabl
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

getInstrumentDescriptor(): InstrumentDescriptor {
return this._instrumentDescriptor;
}

record(value: number, attributes: Attributes, context: Context) {
attributes = this._attributesProcessor.process(attributes, context);
this._deltaMetricStorage.record(value, attributes, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { MetricStorageRegistry } from '../../src/state/MetricStorageRegistry';
import { InstrumentType } from '../../src/Instruments';
import { ValueType } from '@opentelemetry/api-metrics-wip';
import { MetricStorage } from '../../src/state/MetricStorage';
import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../../src/export/MetricData';
import { MetricCollectorHandle } from '../../src/state/MetricCollector';
import { InstrumentDescriptor } from '../../src/InstrumentDescriptor';
import { Maybe } from '../../src/utils';
import * as assert from 'assert';


class TestMetricStorage implements MetricStorage {
constructor(readonly _descriptor: InstrumentDescriptor) {
}

collect(collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
return Promise.resolve(undefined);
}

getInstrumentDescriptor(): InstrumentDescriptor {
return this._descriptor;
}
}

describe('MetricStorageRegistry', () => {
describe('register', () => {
it('should register MetricStorage if it does not exist', () => {
const registry = new MetricStorageRegistry();
const storage = new TestMetricStorage({
name: 'instrument',
type: InstrumentType.COUNTER,
description: 'description',
unit: '1',
valueType: ValueType.DOUBLE
});

const registeredStorage = registry.register(storage);
const registeredStorages = Array.from(registry.getStorages());

// returned the same storage
assert.strictEqual(registeredStorage, storage);
// registered the actual storage
assert.deepStrictEqual([storage], registeredStorages);
});

it('should not register when incompatible instrument is already registered', () => {
const registry = new MetricStorageRegistry();
const storage = new TestMetricStorage({
name: 'instrument',
type: InstrumentType.COUNTER,
description: 'description',
unit: '1',
valueType: ValueType.DOUBLE
});

const otherStorage = new TestMetricStorage({
name: 'instrument',
type: InstrumentType.UP_DOWN_COUNTER,
description: 'description',
unit: '1',
valueType: ValueType.DOUBLE
});

registry.register(storage);
const failedRegisteredStorage = registry.register(otherStorage);
const registeredStorages = Array.from(registry.getStorages());

// returned undefined
assert.strictEqual(failedRegisteredStorage, undefined);
// registered the actual storage, but not more than that.
assert.deepStrictEqual([storage], registeredStorages);
});

it('should not register when compatible async instrument is already registered', () => {
const registry = new MetricStorageRegistry();
const descriptor = {
name: 'instrument',
type: InstrumentType.OBSERVABLE_COUNTER,
description: 'description',
unit: '1',
valueType: ValueType.DOUBLE
};

const storage = new TestMetricStorage(descriptor);
const otherStorage = new TestMetricStorage(descriptor);

registry.register(storage);
const failedRegisteredStorage = registry.register(otherStorage);
const registeredStorages = Array.from(registry.getStorages());

// returned undefined
assert.strictEqual(failedRegisteredStorage, undefined);
// registered the actual storage, but not more than that.
assert.deepStrictEqual([storage], registeredStorages);
});

it('should return the existing instrument if a compatible sync instrument is already registered', () => {
const registry = new MetricStorageRegistry();
const descriptor = {
name: 'instrument',
type: InstrumentType.COUNTER,
description: 'description',
unit: '1',
valueType: ValueType.DOUBLE
};

const storage = new TestMetricStorage(descriptor);
const otherStorage = new TestMetricStorage(descriptor);

registry.register(storage);
const previouslyRegisteredStorage = registry.register(otherStorage);
const registeredStorages = Array.from(registry.getStorages());

// returned undefined
assert.strictEqual(previouslyRegisteredStorage, storage);
// registered the actual storage, but not more than that.
assert.deepStrictEqual([storage], registeredStorages);
});
});
});