From cf1413636485eb271ac21f0df63bec7ec771ee8e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 8 Jan 2022 17:21:06 +0530 Subject: [PATCH 1/7] chore: rename metrics exporter, change export return type, and make shutdown abstract --- .../src/export/MetricExporter.ts | 34 ++++++++++--------- .../export/PeriodicExportingMetricReader.ts | 6 ++-- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts index 697f020b28..34c5c17885 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts @@ -16,40 +16,38 @@ import { AggregationTemporality } from './AggregationTemporality'; import { MetricData } from './MetricData'; +import { + ExportResult, + ExportResultCode, +} from '@opentelemetry/core'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricexporter -// TODO should this just be an interface and exporters can implement their own shutdown? -export abstract class MetricExporter { +export abstract class PushMetricExporter { protected _shutdown = false; - abstract export(batch: MetricData[]): Promise; + abstract export(batch: MetricData[]): Promise; abstract forceFlush(): Promise; abstract getPreferredAggregationTemporality(): AggregationTemporality; - async shutdown(): Promise { - if (this._shutdown) { - return; - } - - // Setting _shutdown before flushing might prevent some exporters from flushing - // Waiting until flushing is complete might allow another flush to occur during shutdown - const flushPromise = this.forceFlush(); - this._shutdown = true; - await flushPromise; - } + abstract shutdown(): Promise; isShutdown() { return this._shutdown; } } -export class ConsoleMetricExporter extends MetricExporter { +export class ConsoleMetricExporter extends PushMetricExporter { async export(_batch: MetricData[]) { - throw new Error('Method not implemented'); + return new Promise((_, reject) => { + reject({ + code: ExportResultCode.FAILED, + error: new Error('Method not implemented') + }); + }); } getPreferredAggregationTemporality() { @@ -58,4 +56,8 @@ export class ConsoleMetricExporter extends MetricExporter { // nothing to do async forceFlush() {} + + async shutdown() { + this._shutdown = true; + } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index f7e5453076..1fdfe80252 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -16,11 +16,11 @@ import * as api from '@opentelemetry/api'; import { MetricReader } from './MetricReader'; -import { MetricExporter } from './MetricExporter'; +import { PushMetricExporter } from './MetricExporter'; import { callWithTimeout, TimeoutError } from '../utils'; export type PeriodicExportingMetricReaderOptions = { - exporter: MetricExporter + exporter: PushMetricExporter exportIntervalMillis?: number, exportTimeoutMillis?: number } @@ -32,7 +32,7 @@ export type PeriodicExportingMetricReaderOptions = { export class PeriodicExportingMetricReader extends MetricReader { private _interval?: ReturnType; - private _exporter: MetricExporter; + private _exporter: PushMetricExporter; private readonly _exportInterval: number; From 8dd316dcdb7fde82b8c8e65b26993edc960ec3fe Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 8 Jan 2022 20:35:14 +0530 Subject: [PATCH 2/7] fix: fix tests --- .../PeriodicExportingMetricReader.test.ts | 18 ++++++++++++++---- .../test/state/MetricCollector.test.ts | 14 ++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index ca426e2908..26c240bd76 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -16,28 +16,38 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { MetricExporter } from '../../src'; +import { PushMetricExporter } from '../../src'; import { MetricData } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { MetricProducer } from '../../src/export/MetricProducer'; import { TimeoutError } from '../../src/utils'; +import { ExportResult, ExportResultCode } from '@opentelemetry/core'; const MAX_32_BIT_INT = 2 ** 31 - 1 -class TestMetricExporter extends MetricExporter { +class TestMetricExporter extends PushMetricExporter { public exportTime = 0; public forceFlushTime = 0; public throwException = false; private _batches: MetricData[][] = []; - async export(batch: MetricData[]): Promise { + async export(batch: MetricData[]): Promise { this._batches.push(batch); if (this.throwException) { throw new Error('Error during export'); } - await new Promise(resolve => setTimeout(resolve, this.exportTime)); + return await new Promise(resolve => setTimeout(() => { + resolve({code: ExportResultCode.SUCCESS}) + }, this.exportTime)); + } + + async shutdown(): Promise { + if (this.isShutdown()) return; + const flushPromise = this.forceFlush(); + this._shutdown = true; + await flushPromise; } async forceFlush(): Promise { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index ce94c080cb..8b11e7bffa 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -18,19 +18,25 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { MetricData, PointDataType } from '../../src/export/MetricData'; -import { MetricExporter } from '../../src/export/MetricExporter'; +import { PushMetricExporter } from '../../src/export/MetricExporter'; import { Meter } from '../../src/Meter'; import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; import { MetricCollector } from '../../src/state/MetricCollector'; import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util'; import { TestMetricReader } from '../export/TestMetricReader'; +import { ExportResult, ExportResultCode } from '@opentelemetry/core'; -class TestMetricExporter extends MetricExporter { +class TestMetricExporter extends PushMetricExporter { metricDataList: MetricData[] = [] - async export(batch: MetricData[]): Promise { + async export(batch: MetricData[]): Promise { this.metricDataList.push(...batch); + return new Promise((resolve, _) => { + resolve({code: ExportResultCode.SUCCESS}) + }); } + async shutdown(): Promise {} + async forceFlush(): Promise {} getPreferredAggregationTemporality(): AggregationTemporality { @@ -63,7 +69,7 @@ describe('MetricCollector', () => { }); describe('collect', () => { - function setupInstruments(exporter: MetricExporter) { + function setupInstruments(exporter: PushMetricExporter) { // TODO(legendecas): setup with MeterProvider when meter identity was settled. const meterProviderSharedState = new MeterProviderSharedState(defaultResource); From 65d16f1dc18f9625f93237e2254bc48fafd3d3c9 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 24 Jan 2022 13:01:55 +0530 Subject: [PATCH 3/7] fix: lint --- .../test/export/PeriodicExportingMetricReader.test.ts | 2 +- .../test/state/MetricCollector.test.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 8e80a20555..fad78e6a3f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -40,7 +40,7 @@ class TestMetricExporter extends PushMetricExporter { throw new Error('Error during export'); } return await new Promise(resolve => setTimeout(() => { - resolve({code: ExportResultCode.SUCCESS}) + resolve({code: ExportResultCode.SUCCESS}); }, this.exportTime)); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 67ab7f7c4b..22c1b94520 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -27,11 +27,11 @@ import { TestMetricReader } from '../export/TestMetricReader'; import { ExportResult, ExportResultCode } from '@opentelemetry/core'; class TestMetricExporter extends PushMetricExporter { - metricDataList: MetricData[] = [] + metricDataList: MetricData[] = []; async export(batch: MetricData[]): Promise { this.metricDataList.push(...batch); return new Promise((resolve, _) => { - resolve({code: ExportResultCode.SUCCESS}) + resolve({code: ExportResultCode.SUCCESS}); }); } From d532421cae8274aebcd0b04ba22bbd5c031103f2 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 18 Feb 2022 01:29:11 +0530 Subject: [PATCH 4/7] chore: address review comments --- .../src/export/MetricExporter.ts | 24 ++++++++----------- .../export/PeriodicExportingMetricReader.ts | 12 +++++++++- .../PeriodicExportingMetricReader.test.ts | 13 +++++----- .../test/state/MetricCollector.test.ts | 8 +++---- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts index 34c5c17885..06d989db2c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts @@ -24,30 +24,26 @@ import { // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricexporter -export abstract class PushMetricExporter { - protected _shutdown = false; +export interface PushMetricExporter { - abstract export(batch: MetricData[]): Promise; + export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void; - abstract forceFlush(): Promise; + forceFlush(): Promise; - abstract getPreferredAggregationTemporality(): AggregationTemporality; + getPreferredAggregationTemporality(): AggregationTemporality; - abstract shutdown(): Promise; + shutdown(): Promise; - isShutdown() { - return this._shutdown; - } } -export class ConsoleMetricExporter extends PushMetricExporter { - async export(_batch: MetricData[]) { - return new Promise((_, reject) => { - reject({ +export class ConsoleMetricExporter implements PushMetricExporter { + protected _shutdown = true; + + async export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) { + return resultCallback({ code: ExportResultCode.FAILED, error: new Error('Method not implemented') }); - }); } getPreferredAggregationTemporality() { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index 711b91e8a3..c18c290db7 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -15,6 +15,7 @@ */ import * as api from '@opentelemetry/api'; +import { ExportResultCode, globalErrorHandler } from '@opentelemetry/core'; import { MetricReader } from './MetricReader'; import { PushMetricExporter } from './MetricExporter'; import { callWithTimeout, TimeoutError } from '../utils'; @@ -62,7 +63,16 @@ export class PeriodicExportingMetricReader extends MetricReader { private async _runOnce(): Promise { const metrics = await this.collect({}); - await this._exporter.export(metrics); + await this._exporter.export(metrics, result => { + if (result.code !== ExportResultCode.SUCCESS) { + globalErrorHandler( + result.error ?? + new Error( + `PeriodicExportingMetricReader: metrics export failed (status ${result})` + ) + ); + } + }); } protected override onInitialized(): void { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index fad78e6a3f..0be27a0707 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -27,25 +27,26 @@ import { assertRejects } from '../test-utils'; const MAX_32_BIT_INT = 2 ** 31 - 1; -class TestMetricExporter extends PushMetricExporter { +class TestMetricExporter implements PushMetricExporter { public exportTime = 0; public forceFlushTime = 0; public throwException = false; private _batches: MetricData[][] = []; + private _shutdown: boolean = false; - async export(batch: MetricData[]): Promise { + async export(batch: MetricData[], resultCallback: (result: ExportResult) => void): Promise { this._batches.push(batch); if (this.throwException) { throw new Error('Error during export'); } - return await new Promise(resolve => setTimeout(() => { - resolve({code: ExportResultCode.SUCCESS}); - }, this.exportTime)); + setTimeout(() => { + resultCallback({code: ExportResultCode.SUCCESS}); + }, this.exportTime); } async shutdown(): Promise { - if (this.isShutdown()) return; + if (this._shutdown) return; const flushPromise = this.forceFlush(); this._shutdown = true; await flushPromise; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 22c1b94520..122bdfd881 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -26,13 +26,11 @@ import { defaultInstrumentationLibrary, defaultResource, assertMetricData, asser import { TestMetricReader } from '../export/TestMetricReader'; import { ExportResult, ExportResultCode } from '@opentelemetry/core'; -class TestMetricExporter extends PushMetricExporter { +class TestMetricExporter implements PushMetricExporter { metricDataList: MetricData[] = []; - async export(batch: MetricData[]): Promise { + async export(batch: MetricData[], resultCallback: (result: ExportResult) => void): Promise { this.metricDataList.push(...batch); - return new Promise((resolve, _) => { - resolve({code: ExportResultCode.SUCCESS}); - }); + resultCallback({code: ExportResultCode.SUCCESS}); } async shutdown(): Promise {} From 59be1fe6a080f27f9392494f50ce544f26de63f4 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 19 Feb 2022 15:44:25 +0530 Subject: [PATCH 5/7] fix: update error message --- .../src/export/PeriodicExportingMetricReader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index c18c290db7..9b792e2a48 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -68,7 +68,7 @@ export class PeriodicExportingMetricReader extends MetricReader { globalErrorHandler( result.error ?? new Error( - `PeriodicExportingMetricReader: metrics export failed (status ${result})` + `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` ) ); } From ceb228298129fe75a07f72c0e672a3a3e4fcf46f Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 22 Feb 2022 16:22:49 +0530 Subject: [PATCH 6/7] chore: little refactoring --- .../export/PeriodicExportingMetricReader.ts | 24 ++++++++++------- .../PeriodicExportingMetricReader.test.ts | 27 +++++++++++++++++-- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index 9b792e2a48..f5595a743d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -63,15 +63,19 @@ export class PeriodicExportingMetricReader extends MetricReader { private async _runOnce(): Promise { const metrics = await this.collect({}); - await this._exporter.export(metrics, result => { - if (result.code !== ExportResultCode.SUCCESS) { - globalErrorHandler( - result.error ?? - new Error( - `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` - ) - ); - } + return new Promise((resolve, reject) => { + this._exporter.export(metrics, result => { + if (result.code !== ExportResultCode.SUCCESS) { + reject( + result.error ?? + new Error( + `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` + ) + ); + } else { + resolve(); + } + }); }); } @@ -86,7 +90,7 @@ export class PeriodicExportingMetricReader extends MetricReader { return; } - api.diag.error('Unexpected error during export: %s', err); + globalErrorHandler(err); } }, this._exportInterval); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 0be27a0707..586dd379b7 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -31,17 +31,22 @@ class TestMetricExporter implements PushMetricExporter { public exportTime = 0; public forceFlushTime = 0; public throwException = false; + public failureResult = false; private _batches: MetricData[][] = []; private _shutdown: boolean = false; - async export(batch: MetricData[], resultCallback: (result: ExportResult) => void): Promise { + export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void { this._batches.push(batch); if (this.throwException) { throw new Error('Error during export'); } setTimeout(() => { - resultCallback({code: ExportResultCode.SUCCESS}); + if (this.failureResult) { + resultCallback({code: ExportResultCode.FAILED, error: new Error('some error') }); + } else { + resultCallback({code: ExportResultCode.SUCCESS }); + } }, this.exportTime); } @@ -187,6 +192,24 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); + it('should keep running on export failure', async () => { + const exporter = new TestMetricExporter(); + exporter.failureResult = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 30, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + + const result = await exporter.waitForNumberOfExports(2); + assert.deepStrictEqual(result, [[], []]); + + exporter.failureResult = false; + await reader.shutdown(); + }); + it('should keep exporting on export timeouts', async () => { const exporter = new TestMetricExporter(); // set time longer than timeout. From e18900fe8ab3d78e5c1f922f12eec3738e4b4554 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 22 Feb 2022 17:16:54 +0530 Subject: [PATCH 7/7] Update experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts Co-authored-by: legendecas --- .../opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts index 06d989db2c..f86f7d84cd 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts @@ -39,7 +39,7 @@ export interface PushMetricExporter { export class ConsoleMetricExporter implements PushMetricExporter { protected _shutdown = true; - async export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) { + export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) { return resultCallback({ code: ExportResultCode.FAILED, error: new Error('Method not implemented')