Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(views): handle view conflicts. #2734

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { View } from './view/View';
/**
* Supported types of metric instruments.
*/
export enum InstrumentType {
export enum InstrumentType {
COUNTER = 'COUNTER',
HISTOGRAM = 'HISTOGRAM',
UP_DOWN_COUNTER = 'UP_DOWN_COUNTER',
Expand Down Expand Up @@ -56,3 +56,10 @@ export function createInstrumentDescriptorWithView(view: View, instrument: Instr
valueType: instrument.valueType,
};
}

export function isDescriptorCompatibleWith(descriptor: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return descriptor.name === otherDescriptor.name
&& descriptor.unit === otherDescriptor.unit
&& descriptor.type === otherDescriptor.type
&& descriptor.valueType === otherDescriptor.valueType;
}
26 changes: 12 additions & 14 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import { Counter, Histogram, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { MetricData } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
import { WritableMetricStorage } from './state/WritableMetricStorage';
import { MetricStorageRegistry } from './state/MetricStorageRegistry';

/**
* This class implements the {@link metrics.Meter} interface.
*/
export class Meter implements metrics.Meter {
private _metricStorageRegistry = new Map<string, MetricStorage>();
private _metricStorageRegistry = new MetricStorageRegistry();

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
this._meterProviderSharedState.meters.push(this);
Expand Down Expand Up @@ -101,26 +102,23 @@ export class Meter implements metrics.Meter {
this._registerAsyncMetricStorage(descriptor, callback);
}

private _registerMetricStorage(descriptor: InstrumentDescriptor) {
private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => {
const storage = SyncMetricStorage.create(view, descriptor);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
return storage;
});
if (storages.length === 1) {
const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor)))
.filter(isNotNullish);

if (storages.length === 1) {
return storages[0];
}

// This will be a no-op WritableMetricStorage when length is null.
return new MultiMetricStorage(storages);
}

private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const storage = AsyncMetricStorage.create(view, descriptor, callback);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback));
});
}

Expand All @@ -131,7 +129,7 @@ export class Meter implements metrics.Meter {
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
const result = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -36,16 +39,17 @@ import { AttributeHashMap } from './HashMap';
*
* Stores and aggregates {@link MetricData} for asynchronous instruments.
*/
export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements MetricStorage {
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
private _instrumentDescriptor: InstrumentDescriptor,
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,44 @@ import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { createInstrumentDescriptor, InstrumentDescriptor } from '../InstrumentDescriptor';

/**
* Internal interface.
*
* Represents a storage from which we can collect metrics.
*/
export interface MetricStorage {
export abstract class MetricStorage {
constructor(protected _instrumentDescriptor: InstrumentDescriptor) {
}

/**
* Collects the metrics from this storage.
*
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
collect(
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;

getInstrumentDescriptor(): InstrumentDescriptor{
return this._instrumentDescriptor;
}

updateDescription(description: string): void{
this._instrumentDescriptor = createInstrumentDescriptor(
this._instrumentDescriptor.name,
this._instrumentDescriptor.type,
{
description: description,
valueType: this._instrumentDescriptor.valueType,
unit: this._instrumentDescriptor.unit
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { MetricStorage } from './MetricStorage';
import { isDescriptorCompatibleWith } from '../InstrumentDescriptor';
import * as api from '@opentelemetry/api';
import { Maybe } from '../utils';
import { getConflictResolutionRecipe, getIncompatibilityDetails } from '../view/RegistrationConflicts';

/**
* Internal class for storing {@link MetricStorage}
*/
export class MetricStorageRegistry {
private readonly _metricStorageRegistry = new Map<string, MetricStorage[]>();

getStorages(): MetricStorage[] {
let storages: MetricStorage[] = [];
for (const metricStorages of this._metricStorageRegistry.values()) {
storages = storages.concat(metricStorages);
}

return storages;
}

register<T extends MetricStorage>(storage: T): Maybe<T> {
const expectedDescriptor = storage.getInstrumentDescriptor();
const existingStorages = this._metricStorageRegistry.get(expectedDescriptor.name);

// Add storage if it does not exist.
if (existingStorages === undefined) {
this._metricStorageRegistry.set(expectedDescriptor.name, [storage]);
return storage;
}

let compatibleStorage = null;

for (const existingStorage of existingStorages) {
const existingDescriptor = existingStorage.getInstrumentDescriptor();

if (isDescriptorCompatibleWith(existingDescriptor, expectedDescriptor)) {
// Use the longer description if it does not match.
if (existingDescriptor.description !== expectedDescriptor.description) {
if (expectedDescriptor.description.length > existingDescriptor.description.length) {
existingStorage.updateDescription(expectedDescriptor.description);
}

api.diag.warn('A view or instrument with the name ',
expectedDescriptor.name,
' has already been registered, but has a different description and is incompatible with another registered view.\n',
'Details:\n',
getIncompatibilityDetails(existingDescriptor, expectedDescriptor),
'The longer description will be used.\nTo resolve the conflict:',
getConflictResolutionRecipe(existingDescriptor, expectedDescriptor));
}
// Storage is fully compatible. There will never be more than one pre-existing fully compatible storage.
compatibleStorage = existingStorage as T;
} else {
// The implementation SHOULD warn about duplicate instrument registration
// conflicts after applying View configuration.
api.diag.warn('A view or instrument with the name ',
expectedDescriptor.name,
' has already been registered and is incompatible with another registered view.\n',
'Details:\n',
getIncompatibilityDetails(existingDescriptor, expectedDescriptor),
'To resolve the conflict:\n',
getConflictResolutionRecipe(existingDescriptor, expectedDescriptor));
}
}

if (compatibleStorage != null) {
return compatibleStorage;
}

// None of the storages were compatible, add the current one to the list.
existingStorages.push(storage);
return storage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import { Attributes } from '@opentelemetry/api-metrics-wip';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -35,15 +38,16 @@ import { MetricCollectorHandle } from './MetricCollector';
*
* Stores and aggregates {@link MetricData} for synchronous instruments.
*/
export class SyncMetricStorage<T extends Maybe<Accumulation>> implements WritableMetricStorage, MetricStorage {
export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage implements WritableMetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
private _instrumentDescriptor: InstrumentDescriptor,
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { InstrumentSelectorCriteria } from './InstrumentSelector';
import { InstrumentDescriptor } from '../InstrumentDescriptor';

export function getIncompatibilityDetails(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
let incompatibility = '';
if (existing.unit !== otherDescriptor.unit) {
incompatibility += `\t- Unit '${existing.unit}' does not match '${otherDescriptor.unit}'\n`;
}
if (existing.type !== otherDescriptor.type) {
incompatibility += `\t- Type '${existing.type}' does not match '${otherDescriptor.type}'\n`;
}
if (existing.valueType !== otherDescriptor.valueType) {
incompatibility += `\t- Value Type '${existing.valueType}' does not match '${otherDescriptor.valueType}'\n`;
}
if (existing.description !== otherDescriptor.description) {
incompatibility += `\t- Description '${existing.description}' does not match '${otherDescriptor.description}'\n`;
}

return incompatibility;
}

export function getValueTypeConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return `\t- use valueType '${existing.valueType}' on instrument creation or use an instrument name other than '${otherDescriptor.name}'`;
}

export function getUnitConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return `\t- use unit '${existing.unit}' on instrument creation or use an instrument name other than '${otherDescriptor.name}'`;
}

export function getTypeConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
const selector: InstrumentSelectorCriteria = {
name: otherDescriptor.name,
type: otherDescriptor.type
};

const selectorString = JSON.stringify(selector);

return `\t- create a new view with a name other than '${existing.name}' and InstrumentSelector '${selectorString}'`;
}

export function getDescriptionResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor): string {
const selector: InstrumentSelectorCriteria = {
name: otherDescriptor.name,
type: otherDescriptor.type
};

const selectorString = JSON.stringify(selector);

return `\t- create a new view with a name other than '${existing.name}' and InstrumentSelector '${selectorString}'
\t- OR - create a new view with the name ${existing.name} and description '${existing.description}' and InstrumentSelector ${selectorString}
\t- OR - create a new view with the name ${otherDescriptor.name} and description '${existing.description}' and InstrumentSelector ${selectorString}`;
}

export function getConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor): string {
// Conflicts that cannot be solved via views.
if (existing.valueType !== otherDescriptor.valueType) {
return getValueTypeConflictResolutionRecipe(existing, otherDescriptor);
}

if (existing.unit !== otherDescriptor.unit) {
return getUnitConflictResolutionRecipe(existing, otherDescriptor);
}

// Conflicts that can be solved via views.
if (existing.type !== otherDescriptor.type) {
// this will automatically solve possible description conflicts.
return getTypeConflictResolutionRecipe(existing, otherDescriptor);
}

if (existing.description !== otherDescriptor.description) {
return getDescriptionResolutionRecipe(existing, otherDescriptor);
}

return '';
}
Loading