Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(instrumentation-pg): not add duplicate listeners to pg pool #2484

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
}

override _updateMetricInstruments() {
const emit = utils.getEmitType();
if (emit === utils.emitType.OLD_SEM_CONV) return;
this._operationDuration = this.meter.createHistogram(
METRIC_DB_CLIENT_OPERATION_DURATION,
{
Expand Down Expand Up @@ -222,6 +224,8 @@
}

private recordOperationDuration(attributes: Attributes, startTime: HrTime) {
const emit = utils.getEmitType();
if (emit === utils.emitType.OLD_SEM_CONV) return;
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
const metricsAttributes: Attributes = {};
const keysToCopy = [
SEMATTRS_DB_SYSTEM,
Expand Down Expand Up @@ -435,6 +439,54 @@
};
}

private _setPoolConnectEventListeners(pgPool: PgPoolExtended) {
const emit = utils.getEmitType();
if (emit === utils.emitType.OLD_SEM_CONV) return;
if (pgPool.eventListenersSet) return;
const poolName = utils.getPoolName(pgPool.options);

pgPool.on('connect', () => {
this._connectionsCounter = utils.updateCounter(
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});

pgPool.on('acquire', () => {
this._connectionsCounter = utils.updateCounter(
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});

pgPool.on('remove', () => {
this._connectionsCounter = utils.updateCounter(
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});

pgPool.on('release' as any, () => {
this._connectionsCounter = utils.updateCounter(

Check warning on line 479 in plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts#L479

Added line #L479 was not covered by tests
poolName,
pgPool,
this._connectionsCount,
this._connectionPendingRequests,
this._connectionsCounter
);
});
pgPool.eventListenersSet = true;
}

private _getPoolConnectPatch() {
const plugin = this;
return (originalConnect: typeof pgPoolTypes.prototype.connect) => {
Expand All @@ -449,41 +501,7 @@
attributes: utils.getSemanticAttributesFromPool(this.options),
});

this.on('connect', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('acquire', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('remove', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('release' as any, () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});
plugin._setPoolConnectEventListeners(this);

if (callback) {
const parentSpan = trace.getSpan(context.active());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface PgPoolOptionsParams {

export interface PgPoolExtended extends pgPoolTypes<pgTypes.Client> {
options: PgPoolOptionsParams;
eventListenersSet: boolean; // flag to identify if the event listeners for instrumentation have been set
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
}

export type PgClientConnect = (callback?: Function) => Promise<void> | void;
18 changes: 17 additions & 1 deletion plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@
import { safeExecuteInTheMiddle } from '@opentelemetry/instrumentation';
import { SpanNames } from './enums/SpanNames';

export enum emitType {
NEW_SEM_CONV,
OLD_SEM_CONV,
BOTH_SEM_CONV,
}

export function getEmitType(): emitType {
if (process.env.OTEL_SEMCONV_STABILITY_OPT_IN?.includes('database/dup')) {
return emitType.BOTH_SEM_CONV;

Check warning on line 65 in plugins/node/opentelemetry-instrumentation-pg/src/utils.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/opentelemetry-instrumentation-pg/src/utils.ts#L65

Added line #L65 was not covered by tests
}
if (process.env.OTEL_SEMCONV_STABILITY_OPT_IN?.includes('database')) {
return emitType.NEW_SEM_CONV;
}
return emitType.OLD_SEM_CONV;
}

/**
* Helper function to get a low cardinality span name from whatever info we have
* about the query.
Expand Down Expand Up @@ -282,12 +298,12 @@
}

export function updateCounter(
poolName: string,
pool: PgPoolExtended,
connectionCount: UpDownCounter,
connectionPendingRequests: UpDownCounter,
latestCounter: poolConnectionsCounter
): poolConnectionsCounter {
const poolName = getPoolName(pool.options);
const all = pool.totalCount;
const pending = pool.waitingCount;
const idle = pool.idleCount;
Expand Down
Loading