Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change metrics export data model to match OTLP protos #2809

Merged
merged 8 commits into from
Mar 6, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { MetricData } from './export/MetricData';
import { InstrumentationLibraryMetrics } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
Expand Down Expand Up @@ -130,16 +130,18 @@ export class Meter implements metrics.Meter {
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
const metricData = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));
return result.filter(isNotNullish);

return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricData.filter(isNotNullish),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/

import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand All @@ -43,8 +41,6 @@ export class DropAggregator implements Aggregator<undefined> {
}

toMetricData(
_resource: Resource,
_instrumentationLibrary: InstrumentationLibrary,
_instrumentDescriptor: InstrumentDescriptor,
_accumulationByAttributes: AccumulationRecord<undefined>[],
_startTime: HrTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import {
Histogram,
} from './types';
import { HistogramMetricData, PointDataType } from '../export/MetricData';
import { Resource } from '@opentelemetry/resources';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { HrTime } from '@opentelemetry/api';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -138,15 +136,11 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
metricDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<HistogramAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<HistogramMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor: metricDescriptor,
pointDataType: PointDataType.HISTOGRAM,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

import { LastValue, AggregatorKind, Aggregator, Accumulation, AccumulationRecord } from './types';
import { HrTime } from '@opentelemetry/api';
import { hrTime, hrTimeToMicroseconds, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { hrTime, hrTimeToMicroseconds } from '@opentelemetry/core';
import { PointDataType, SingularMetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -67,15 +66,11 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
instrumentDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<LastValueAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor,
pointDataType: PointDataType.SINGULAR,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import { Sum, AggregatorKind, Aggregator, Accumulation, AccumulationRecord } from './types';
import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { PointDataType, SingularMetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -57,15 +55,11 @@ export class SumAggregator implements Aggregator<SumAccumulation> {
}

toMetricData(
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
instrumentDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<SumAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
resource,
instrumentationLibrary,
instrumentDescriptor,
pointDataType: PointDataType.SINGULAR,
pointData: accumulationByAttributes.map(([attributes, accumulation]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import { HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { Maybe } from '../utils';
Expand Down Expand Up @@ -116,9 +114,7 @@ export interface Aggregator<T> {
* @param endTime the end time of the metric data.
* @return the {@link MetricData} that this {@link Aggregator} will produce.
*/
toMetricData(resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
instrumentDescriptor: InstrumentDescriptor,
toMetricData(instrumentDescriptor: InstrumentDescriptor,
accumulationByAttributes: AccumulationRecord<T>[],
startTime: HrTime,
endTime: HrTime): Maybe<MetricData>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,45 @@ import { Histogram } from '../aggregator/types';
* Basic metric data fields.
*/
export interface BaseMetricData {
/**
* Resource associated with metric telemetry.
*/
readonly resource: Resource;
/**
* InstrumentationLibrary which created the metric instrument.
*/
readonly instrumentationLibrary: InstrumentationLibrary;
/**
* InstrumentDescriptor which describes the metric instrument.
*/
readonly instrumentDescriptor: InstrumentDescriptor;
/**
* PointDataType of the metric instrument.
*/
readonly pointDataType: PointDataType,
readonly pointDataType: PointDataType;
}

/**
* Represents a metric data aggregated by either a LastValueAggregation or
* SumAggregation.
*/
export interface SingularMetricData extends BaseMetricData {
readonly pointDataType: PointDataType.SINGULAR,
readonly pointData: PointData<number>[],
readonly pointDataType: PointDataType.SINGULAR;
readonly pointData: PointData<number>[];
}

/**
* Represents a metric data aggregated by a HistogramAggregation.
*/
export interface HistogramMetricData extends BaseMetricData {
readonly pointDataType: PointDataType.HISTOGRAM,
readonly pointData: PointData<Histogram>[],
readonly pointDataType: PointDataType.HISTOGRAM;
readonly pointData: PointData<Histogram>[];
}

/**
* Represents an aggregated metric data.
*/
export type MetricData = SingularMetricData | HistogramMetricData;

export interface InstrumentationLibraryMetrics {
instrumentationLibrary: InstrumentationLibrary;
metrics: MetricData[];
}

export interface ResourceMetrics {
resource: Resource;
instrumentationLibraryMetrics: InstrumentationLibraryMetrics[];
}

/**
* The aggregated point data type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { AggregationTemporality } from './AggregationTemporality';
import { MetricData } from './MetricData';
import { ResourceMetrics } from './MetricData';
import {
ExportResult,
ExportResultCode,
Expand All @@ -26,7 +26,7 @@ import {

export interface PushMetricExporter {

export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void;
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void;

forceFlush(): Promise<void>;

Expand All @@ -39,7 +39,7 @@ export interface PushMetricExporter {
export class ConsoleMetricExporter implements PushMetricExporter {
protected _shutdown = true;

export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) {
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void) {
return resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Method not implemented')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

import { MetricData } from './MetricData';
import { ResourceMetrics } from './MetricData';

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<MetricData[]>;
collect(): Promise<ResourceMetrics>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { MetricData } from './MetricData';
import { callWithTimeout } from '../utils';
import { ResourceMetrics } from './MetricData';
import { callWithTimeout, Maybe } from '../utils';


export type ReaderOptions = {
timeoutMillis?: number
Expand Down Expand Up @@ -91,15 +92,15 @@ export abstract class MetricReader {
/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<MetricData[]> {
async collect(options?: ReaderCollectionOptions): Promise<Maybe<ResourceMetrics>> {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return [];
return undefined;
}

// No timeout if timeoutMillis is undefined or null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ export class PeriodicExportingMetricReader extends MetricReader {

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});

return new Promise((resolve, reject) => {
if (metrics === undefined) {
resolve();
seemk marked this conversation as resolved.
Show resolved Hide resolved
return;
}

this._exporter.export(metrics, result => {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
Expand Down Expand Up @@ -68,8 +66,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
async collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
Expand All @@ -83,8 +79,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
resource,
instrumentationLibrary,
this._instrumentDescriptor,
accumulations,
sdkStartTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { hrTime } from '@opentelemetry/core';
import { AggregationTemporality } from '../export/AggregationTemporality';
import { MetricData } from '../export/MetricData';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { MeterProviderSharedState } from './MeterProviderSharedState';
Expand All @@ -32,12 +32,15 @@ export class MetricCollector implements MetricProducer {
this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality();
}

async collect(): Promise<MetricData[]> {
async collect(): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const results = await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime)));
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime)))).filter(({ metrics }) => metrics.length > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd find we don't need to do this filtering in the SDK. Like, say, with the DELTA aggregation temporality, the metrics for a meter may contain no items if in a period of time there is no metric event reported. It may be confusing that the InstrumentationLibrary sometimes disappears.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are suggesting to export empty InstrumentationLibraryMetrics in that case and leave it up to the exporter and/or backend to sort it out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the filtering


return results.reduce((cumulation, current) => cumulation.concat(current), []);
return {
resource: this._sharedState.resource,
instrumentationLibraryMetrics,
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/

import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
Expand All @@ -36,8 +34,6 @@ export interface MetricStorage {
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
Expand Down Expand Up @@ -62,8 +60,6 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> implements Writabl
async collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
Expand All @@ -72,8 +68,6 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> implements Writabl
return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
resource,
instrumentationLibrary,
this._instrumentDescriptor,
accumulations,
sdkStartTime,
Expand Down
Loading