Skip to content

Commit

Permalink
feat(views): update implementation current proposed spec.
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc committed Feb 16, 2022
1 parent fe5e485 commit acdd6de
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { View } from './view/View';
/**
* Supported types of metric instruments.
*/
export enum InstrumentType {
export enum InstrumentType {
COUNTER = 'COUNTER',
HISTOGRAM = 'HISTOGRAM',
UP_DOWN_COUNTER = 'UP_DOWN_COUNTER',
Expand Down Expand Up @@ -58,30 +58,8 @@ export function createInstrumentDescriptorWithView(view: View, instrument: Instr
}

export function isDescriptorCompatibleWith(descriptor: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
// description is ignored as it is not semantic in nature.
return descriptor.name === otherDescriptor.name
&& descriptor.unit === otherDescriptor.unit
&& descriptor.type === otherDescriptor.type
&& descriptor.valueType === otherDescriptor.valueType;
}

export function getDescriptorIncompatibilityDetails(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
let incompatibility = '';
if (existing.unit !== otherDescriptor.unit) {
incompatibility += `\t- Unit '${existing.unit}' does not match '${otherDescriptor.description}'\n`;
}
if (existing.type !== otherDescriptor.type) {
incompatibility += `\t- Type '${existing.type}' does not match '${otherDescriptor.type}'\n`;
}
if (existing.valueType !== otherDescriptor.valueType) {
incompatibility += `\t- Value Type '${existing.valueType}' does not match '${otherDescriptor.valueType}'\n`;
}

return incompatibility;
}

export function isDescriptorAsync(descriptor: InstrumentDescriptor) {
return (descriptor.type === InstrumentType.OBSERVABLE_GAUGE ||
descriptor.type === InstrumentType.OBSERVABLE_UP_DOWN_COUNTER ||
descriptor.type === InstrumentType.OBSERVABLE_COUNTER);
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export class Meter implements metrics.Meter {
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(Array.from(this._metricStorageRegistry.getStorages()).map(metricStorage => {
const result = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -36,24 +39,21 @@ import { AttributeHashMap } from './HashMap';
*
* Stores and aggregates {@link MetricData} for asynchronous instruments.
*/
export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements MetricStorage {
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
private _instrumentDescriptor: InstrumentDescriptor,
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

getInstrumentDescriptor(): InstrumentDescriptor {
return this._instrumentDescriptor;
}

private _record(measurements: AttributeHashMap<number>) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@ import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { createInstrumentDescriptor, InstrumentDescriptor } from '../InstrumentDescriptor';

/**
* Internal interface.
*
* Represents a storage from which we can collect metrics.
*/
export interface MetricStorage {
export abstract class MetricStorage {
constructor(protected _instrumentDescriptor: InstrumentDescriptor) {
}

/**
* Collects the metrics from this storage.
*
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
collect(
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
Expand All @@ -43,5 +46,18 @@ export interface MetricStorage {
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;

getInstrumentDescriptor(): InstrumentDescriptor;
getInstrumentDescriptor(): InstrumentDescriptor{
return this._instrumentDescriptor;
}

updateDescription(description: string): void{
this._instrumentDescriptor = createInstrumentDescriptor(
this._instrumentDescriptor.name,
this._instrumentDescriptor.type,
{
description: description,
valueType: this._instrumentDescriptor.valueType,
unit: this._instrumentDescriptor.unit
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,79 @@
*/

import { MetricStorage } from './MetricStorage';
import {
getDescriptorIncompatibilityDetails, isDescriptorAsync,
isDescriptorCompatibleWith
} from '../InstrumentDescriptor';
import { isDescriptorCompatibleWith } from '../InstrumentDescriptor';
import * as api from '@opentelemetry/api';
import { Maybe } from '../utils';
import { getConflictResolutionRecipe, getIncompatibilityDetails } from '../view/RegistrationConflicts';

/**
* Internal class for storing @{LinkMetricStorage}
* Internal class for storing {@link MetricStorage}
*/
export class MetricStorageRegistry {
private readonly _metricStorageRegistry = new Map<string, MetricStorage>();
private readonly _metricStorageRegistry = new Map<string, MetricStorage[]>();

getStorages(): IterableIterator<MetricStorage> {
return this._metricStorageRegistry.values();
getStorages(): MetricStorage[] {
const storages = [];
for (const metricStorages of Array.from(this._metricStorageRegistry.values())) {
storages.push(...metricStorages);
}

return storages;
}

register<T extends MetricStorage>(storage: T): Maybe<T> {
const expectedDescriptor = storage.getInstrumentDescriptor();
const existingStorages = this._metricStorageRegistry.get(expectedDescriptor.name);

// create and register a new one if it does not exist yet.
if (!this._metricStorageRegistry.has(expectedDescriptor.name)) {
this._metricStorageRegistry.set(expectedDescriptor.name, storage);
// Add storage if it does not exist.
if (existingStorages === undefined) {
this._metricStorageRegistry.set(expectedDescriptor.name, [storage]);
return storage;
}

// We already checked for existence with has()
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const existingStorage = this._metricStorageRegistry.get(expectedDescriptor.name)!;
const existingDescriptor = existingStorage.getInstrumentDescriptor();
const compatibleStorages = [];

for (const existingStorage of existingStorages) {
const existingDescriptor = existingStorage.getInstrumentDescriptor();

// Return storage if it is compatible.
if (isDescriptorCompatibleWith(existingDescriptor, expectedDescriptor)) {
// Is compatible, but views for async instruments cannot be registered twice.
if (isDescriptorAsync(existingDescriptor)) {
api.diag.warn(`A view for an async instrument with the name '${expectedDescriptor.name}' has already been registered.`);
return undefined;
if (isDescriptorCompatibleWith(existingDescriptor, expectedDescriptor)) {
// Use the longer description if it does not match.
if (existingDescriptor.description !== expectedDescriptor.description) {
if (expectedDescriptor.description.length > existingDescriptor.description.length) {
existingStorage.updateDescription(expectedDescriptor.description);
}

api.diag.warn('A view or instrument with the name ',
expectedDescriptor.name,
' has already been registered, but has a different description and is incompatible with another registered view.\n',
'Details:\n',
getIncompatibilityDetails(existingDescriptor, expectedDescriptor),
'The longer description will be used.\nTo resolve the conflict:',
getConflictResolutionRecipe(existingDescriptor, expectedDescriptor));
}
// Storage is fully compatible.
compatibleStorages.push(existingStorage as T);
} else {
// The implementation SHOULD warn about duplicate instrument registration
// conflicts after applying View configuration.
api.diag.warn('A view or instrument with the name ',
expectedDescriptor.name,
' has already been registered and is incompatible with another registered view.\n',
'Details:\n',
getIncompatibilityDetails(existingDescriptor, expectedDescriptor),
'To resolve the conflict:\n',
getConflictResolutionRecipe(existingDescriptor, expectedDescriptor));
}
}

return (existingStorage as T);
// When one compatible storage is already present, another compatible one will not be pushed.
// Therefore this will never be > 1
if (compatibleStorages.length > 0) {
return compatibleStorages[0];
}

// Throw an error if it is not compatible.
throw new Error('A view or instrument with the name'
+ expectedDescriptor.name
+ 'has already been registered and is incompatible with another registered view.\n'
+ 'Details:\n'
+ getDescriptorIncompatibilityDetails(existingDescriptor, expectedDescriptor));
// None of the storages were compatible, add the current one to the list.
existingStorages.push(storage);
return storage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import { Attributes } from '@opentelemetry/api-metrics-wip';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -35,23 +38,20 @@ import { MetricCollectorHandle } from './MetricCollector';
*
* Stores and aggregates {@link MetricData} for synchronous instruments.
*/
export class SyncMetricStorage<T extends Maybe<Accumulation>> implements WritableMetricStorage, MetricStorage {
export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage implements WritableMetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
private _instrumentDescriptor: InstrumentDescriptor,
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

getInstrumentDescriptor(): InstrumentDescriptor {
return this._instrumentDescriptor;
}

record(value: number, attributes: Attributes, context: Context) {
attributes = this._attributesProcessor.process(attributes, context);
this._deltaMetricStorage.record(value, attributes, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { InstrumentSelectorCriteria } from './InstrumentSelector';
import { InstrumentDescriptor } from '../InstrumentDescriptor';

export function getIncompatibilityDetails(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
let incompatibility = '';
if (existing.unit !== otherDescriptor.unit) {
incompatibility += `\t- Unit '${existing.unit}' does not match '${otherDescriptor.unit}'\n`;
}
if (existing.type !== otherDescriptor.type) {
incompatibility += `\t- Type '${existing.type}' does not match '${otherDescriptor.type}'\n`;
}
if (existing.valueType !== otherDescriptor.valueType) {
incompatibility += `\t- Value Type '${existing.valueType}' does not match '${otherDescriptor.valueType}'\n`;
}
if (existing.description !== otherDescriptor.description) {
incompatibility += `\t- Description '${existing.description}' does not match '${otherDescriptor.description}'\n`;
}

return incompatibility;
}

export function getValueTypeConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return `\t- use valueType '${existing.valueType}' on instrument creation or use an instrument name other than '${otherDescriptor.name}'`;
}

export function getUnitConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
return `\t- use unit '${existing.unit}' on instrument creation or use an instrument name other than '${otherDescriptor.name}'`;
}

export function getTypeConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor) {
const selector: InstrumentSelectorCriteria = {
name: otherDescriptor.name,
type: otherDescriptor.type
};

const selectorString = JSON.stringify(selector);

return `\t- create a new view with a name other than '${existing.name}' and InstrumentSelector '${selectorString}'`;
}

export function getDescriptionResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor): string {
const selector: InstrumentSelectorCriteria = {
name: otherDescriptor.name,
type: otherDescriptor.type
};

const selectorString = JSON.stringify(selector);

return `\t- create a new view with a name other than '${existing.name}' and InstrumentSelector '${selectorString}'
\t- OR - create a new view with the name ${existing.name} and description '${existing.description}' and InstrumentSelector ${selectorString}
\t- OR - create a new view with the name ${otherDescriptor.name} and description '${existing.description}' and InstrumentSelector ${selectorString}`;
}

export function getConflictResolutionRecipe(existing: InstrumentDescriptor, otherDescriptor: InstrumentDescriptor): string {
// Conflicts that cannot be solved via views.
if (existing.valueType !== otherDescriptor.valueType) {
return getValueTypeConflictResolutionRecipe(existing, otherDescriptor);
}

if (existing.unit !== otherDescriptor.unit) {
return getUnitConflictResolutionRecipe(existing, otherDescriptor);
}

// Conflicts that can be solved via views.
if (existing.type !== otherDescriptor.type) {
// this will automatically solve possible description conflicts.
return getTypeConflictResolutionRecipe(existing, otherDescriptor);
}

if (existing.description !== otherDescriptor.description) {
return getDescriptionResolutionRecipe(existing, otherDescriptor);
}

return '';
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ describe('Meter', () => {
const counter = meter.createCounter('foobar');
assert(counter instanceof Counter);
});

it('should allow duplicate registration', () => {
const meter = new Meter(new MeterProviderSharedState(defaultResource), defaultInstrumentationLibrary);
const counter = meter.createCounter('foobar');
const duplicateCounter = meter.createUpDownCounter('foobar');
assert(counter instanceof Counter);
assert(duplicateCounter instanceof Counter);
});
});

describe('createUpDownCounter', () => {
Expand Down
Loading

0 comments on commit acdd6de

Please sign in to comment.