Skip to content

Commit

Permalink
feat(views): handle view conflicts. (#2734)
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc authored Mar 4, 2022
1 parent a901732 commit 5b83c18
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { View } from './view/View';
/**
* Supported types of metric instruments.
*/
export enum InstrumentType {
export enum InstrumentType {
COUNTER = 'COUNTER',
HISTOGRAM = 'HISTOGRAM',
UP_DOWN_COUNTER = 'UP_DOWN_COUNTER',
Expand Down Expand Up @@ -56,3 +56,10 @@ export function createInstrumentDescriptorWithView(view: View, instrument: Instr
valueType: instrument.valueType,
};
}

export function isDescriptorCompatibleWith(descriptor: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return descriptor.name === otherDescriptor.name
&& descriptor.unit === otherDescriptor.unit
&& descriptor.type === otherDescriptor.type
&& descriptor.valueType === otherDescriptor.valueType;
}
26 changes: 12 additions & 14 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import { Counter, Histogram, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { MetricData } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
import { WritableMetricStorage } from './state/WritableMetricStorage';
import { MetricStorageRegistry } from './state/MetricStorageRegistry';

/**
* This class implements the {@link metrics.Meter} interface.
*/
export class Meter implements metrics.Meter {
private _metricStorageRegistry = new Map<string, MetricStorage>();
private _metricStorageRegistry = new MetricStorageRegistry();

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
this._meterProviderSharedState.meters.push(this);
Expand Down Expand Up @@ -101,26 +102,23 @@ export class Meter implements metrics.Meter {
this._registerAsyncMetricStorage(descriptor, callback);
}

private _registerMetricStorage(descriptor: InstrumentDescriptor) {
private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => {
const storage = SyncMetricStorage.create(view, descriptor);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
return storage;
});
if (storages.length === 1) {
const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor)))
.filter(isNotNullish);

if (storages.length === 1) {
return storages[0];
}

// This will be a no-op WritableMetricStorage when length is null.
return new MultiMetricStorage(storages);
}

private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const storage = AsyncMetricStorage.create(view, descriptor, callback);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback));
});
}

Expand All @@ -131,7 +129,7 @@ export class Meter implements metrics.Meter {
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
const result = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -36,16 +39,17 @@ import { AttributeHashMap } from './HashMap';
*
* Stores and aggregates {@link MetricData} for asynchronous instruments.
*/
export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements MetricStorage {
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
private _instrumentDescriptor: InstrumentDescriptor,
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,44 @@ import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { createInstrumentDescriptor, InstrumentDescriptor } from '../InstrumentDescriptor';

/**
* Internal interface.
*
* Represents a storage from which we can collect metrics.
*/
export interface MetricStorage {
export abstract class MetricStorage {
constructor(protected _instrumentDescriptor: InstrumentDescriptor) {
}

/**
* Collects the metrics from this storage.
*
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
collect(
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;

getInstrumentDescriptor(): InstrumentDescriptor{
return this._instrumentDescriptor;
}

updateDescription(description: string): void{
this._instrumentDescriptor = createInstrumentDescriptor(
this._instrumentDescriptor.name,
this._instrumentDescriptor.type,
{
description: description,
valueType: this._instrumentDescriptor.valueType,
unit: this._instrumentDescriptor.unit
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { MetricStorage } from './MetricStorage';
import { isDescriptorCompatibleWith } from '../InstrumentDescriptor';
import * as api from '@opentelemetry/api';
import { Maybe } from '../utils';
import { getConflictResolutionRecipe, getIncompatibilityDetails } from '../view/RegistrationConflicts';

/**
* Internal class for storing {@link MetricStorage}
*/
export class MetricStorageRegistry {
private readonly _metricStorageRegistry = new Map<string, MetricStorage[]>();

getStorages(): MetricStorage[] {
let storages: MetricStorage[] = [];
for (const metricStorages of this._metricStorageRegistry.values()) {
storages = storages.concat(metricStorages);
}

return storages;
}

register<T extends MetricStorage>(storage: T): Maybe<T> {
const expectedDescriptor = storage.getInstrumentDescriptor();
const existingStorages = this._metricStorageRegistry.get(expectedDescriptor.name);

// Add storage if it does not exist.
if (existingStorages === undefined) {
this._metricStorageRegistry.set(expectedDescriptor.name, [storage]);
return storage;
}

let compatibleStorage = null;

for (const existingStorage of existingStorages) {
const existingDescriptor = existingStorage.getInstrumentDescriptor();

if (isDescriptorCompatibleWith(existingDescriptor, expectedDescriptor)) {
// Use the longer description if it does not match.
if (existingDescriptor.description !== expectedDescriptor.description) {
if (expectedDescriptor.description.length > existingDescriptor.description.length) {
existingStorage.updateDescription(expectedDescriptor.description);
}

api.diag.warn('A view or instrument with the name ',
expectedDescriptor.name,
' has already been registered, but has a different description and is incompatible with another registered view.\n',
'Details:\n',
getIncompatibilityDetails(existingDescriptor, expectedDescriptor),
'The longer description will be used.\nTo resolve the conflict:',
getConflictResolutionRecipe(existingDescriptor, expectedDescriptor));
}
// Storage is fully compatible. There will never be more than one pre-existing fully compatible storage.
compatibleStorage = existingStorage as T;
} else {
// The implementation SHOULD warn about duplicate instrument registration
// conflicts after applying View configuration.
api.diag.warn('A view or instrument with the name ',
expectedDescriptor.name,
' has already been registered and is incompatible with another registered view.\n',
'Details:\n',
getIncompatibilityDetails(existingDescriptor, expectedDescriptor),
'To resolve the conflict:\n',
getConflictResolutionRecipe(existingDescriptor, expectedDescriptor));
}
}

if (compatibleStorage != null) {
return compatibleStorage;
}

// None of the storages were compatible, add the current one to the list.
existingStorages.push(storage);
return storage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import { Attributes } from '@opentelemetry/api-metrics-wip';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -35,15 +38,16 @@ import { MetricCollectorHandle } from './MetricCollector';
*
* Stores and aggregates {@link MetricData} for synchronous instruments.
*/
export class SyncMetricStorage<T extends Maybe<Accumulation>> implements WritableMetricStorage, MetricStorage {
export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage implements WritableMetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
private _instrumentDescriptor: InstrumentDescriptor,
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { InstrumentSelectorCriteria } from './InstrumentSelector';
import { InstrumentDescriptor } from '../InstrumentDescriptor';

export function getIncompatibilityDetails(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
let incompatibility = '';
if (existing.unit !== otherDescriptor.unit) {
incompatibility += `\t- Unit '${existing.unit}' does not match '${otherDescriptor.unit}'\n`;
}
if (existing.type !== otherDescriptor.type) {
incompatibility += `\t- Type '${existing.type}' does not match '${otherDescriptor.type}'\n`;
}
if (existing.valueType !== otherDescriptor.valueType) {
incompatibility += `\t- Value Type '${existing.valueType}' does not match '${otherDescriptor.valueType}'\n`;
}
if (existing.description !== otherDescriptor.description) {
incompatibility += `\t- Description '${existing.description}' does not match '${otherDescriptor.description}'\n`;
}

return incompatibility;
}

export function getValueTypeConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return `\t- use valueType '${existing.valueType}' on instrument creation or use an instrument name other than '${otherDescriptor.name}'`;
}

export function getUnitConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return `\t- use unit '${existing.unit}' on instrument creation or use an instrument name other than '${otherDescriptor.name}'`;
}

export function getTypeConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
const selector: InstrumentSelectorCriteria = {
name: otherDescriptor.name,
type: otherDescriptor.type
};

const selectorString = JSON.stringify(selector);

return `\t- create a new view with a name other than '${existing.name}' and InstrumentSelector '${selectorString}'`;
}

export function getDescriptionResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor): string {
const selector: InstrumentSelectorCriteria = {
name: otherDescriptor.name,
type: otherDescriptor.type
};

const selectorString = JSON.stringify(selector);

return `\t- create a new view with a name other than '${existing.name}' and InstrumentSelector '${selectorString}'
\t- OR - create a new view with the name ${existing.name} and description '${existing.description}' and InstrumentSelector ${selectorString}
\t- OR - create a new view with the name ${otherDescriptor.name} and description '${existing.description}' and InstrumentSelector ${selectorString}`;
}

export function getConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor): string {
// Conflicts that cannot be solved via views.
if (existing.valueType !== otherDescriptor.valueType) {
return getValueTypeConflictResolutionRecipe(existing, otherDescriptor);
}

if (existing.unit !== otherDescriptor.unit) {
return getUnitConflictResolutionRecipe(existing, otherDescriptor);
}

// Conflicts that can be solved via views.
if (existing.type !== otherDescriptor.type) {
// this will automatically solve possible description conflicts.
return getTypeConflictResolutionRecipe(existing, otherDescriptor);
}

if (existing.description !== otherDescriptor.description) {
return getDescriptionResolutionRecipe(existing, otherDescriptor);
}

return '';
}
Loading

0 comments on commit 5b83c18

Please sign in to comment.