From 421a6964cc874ea22fc962592150b9a05ddb6554 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 5 Aug 2024 16:05:12 -0500 Subject: [PATCH 1/9] feat: add WebSockets metrics --- packages/transport-websockets/src/index.ts | 39 +++++++++++++++++-- packages/transport-websockets/src/listener.ts | 12 +++++- .../src/socket-to-conn.ts | 17 +++++++- 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 49a6426d72..5400b47b7b 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -67,7 +67,7 @@ import { isBrowser, isWebWorker } from 'wherearewe' import * as filters from './filters.js' import { createListener } from './listener.js' import { socketToMaConn } from './socket-to-conn.js' -import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents } from '@libp2p/interface' +import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' @@ -82,6 +82,13 @@ export interface WebSocketsInit extends AbortOptions, WebSocketOptions { export interface WebSocketsComponents { logger: ComponentLogger + metrics?: Metrics +} + +export interface WebSocketsMetrics { + dialerEvents: CounterGroup + listenerEvents: CounterGroup + component: Metrics } export type WebSocketsDialEvents = @@ -92,11 +99,26 @@ class WebSockets implements Transport { private readonly log: Logger private readonly init?: WebSocketsInit private readonly logger: ComponentLogger + private readonly metrics?: WebSocketsMetrics constructor (components: WebSocketsComponents, init?: WebSocketsInit) { this.log = components.logger.forComponent('libp2p:websockets') this.logger = components.logger this.init = init + + if (components.metrics != null) { + this.metrics = { + component: components.metrics, + dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', { + label: 'event', + help: 'Total count of WebSockets dialer events by type' + }), + listenerEvents: components.metrics.registerCounterGroup('libp2p_websockets_listener_events_total', { + label: 'event', + help: 'Total count of WebSockets listener events by type' + }) + } + } } readonly [transportSymbol] = true @@ -113,11 +135,14 @@ class WebSockets implements Transport { const socket = await this._connect(ma, options) const maConn = socketToMaConn(socket, ma, { - logger: this.logger + logger: this.logger, + metrics: this.metrics?.dialerEvents }) this.log('new outbound connection %s', maConn.remoteAddr) + this.metrics?.dialerEvents.increment({ upgrade_start: true }) const conn = await options.upgrader.upgradeOutbound(maConn, options) + this.metrics?.dialerEvents.increment({ upgrade_success: true }) this.log('outbound connection %s upgraded', maConn.remoteAddr) return conn } @@ -136,22 +161,29 @@ class WebSockets implements Transport { // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED') this.log.error('connection error:', err) + this.metrics?.dialerEvents.increment({ socket_open_error: true }) errorPromise.reject(err) }) try { options.onProgress?.(new CustomProgressEvent('websockets:open-connection')) + this.metrics?.dialerEvents.increment({ socket_open_start: true }) await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal) } catch (err: any) { + if (options.signal?.aborted === true) { + this.metrics?.dialerEvents.increment({ socket_open_abort: true }) + } rawSocket.close() .catch(err => { this.log.error('error closing raw socket', err) + this.metrics?.dialerEvents.increment({ socket_close_error: true }) }) throw err } this.log('connected %s', ma) + this.metrics?.dialerEvents.increment({ socket_open_success: true }) return rawSocket } @@ -162,7 +194,8 @@ class WebSockets implements Transport { */ createListener (options: CreateListenerOptions): Listener { return createListener({ - logger: this.logger + logger: this.logger, + metrics: this.metrics?.listenerEvents }, { ...this.init, ...options diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 20c2d7b50f..5941ef404b 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -4,7 +4,7 @@ import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multi import { multiaddr, protocols } from '@multiformats/multiaddr' import { createServer } from 'it-ws/server' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interface' +import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' @@ -12,6 +12,7 @@ import type { WebSocketServer } from 'it-ws/server' export interface WebSocketListenerComponents { logger: ComponentLogger + metrics?: CounterGroup } export interface WebSocketListenerInit extends CreateListenerOptions { @@ -28,6 +29,7 @@ class WebSocketListener extends TypedEventEmitter implements Lis super() this.log = components.logger.forComponent('libp2p:websockets:listener') + const metrics = components.metrics // Keep track of open connections to destroy when the listener is closed this.connections = new Set() @@ -36,8 +38,10 @@ class WebSocketListener extends TypedEventEmitter implements Lis this.server = createServer({ ...init, onConnection: (stream: DuplexWebSocket) => { + metrics?.increment({ socket_open_start: true }) const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { - logger: components.logger + logger: components.logger, + metrics }) this.log('new inbound connection %s', maConn.remoteAddr) @@ -48,9 +52,11 @@ class WebSocketListener extends TypedEventEmitter implements Lis }) try { + metrics?.increment({ upgrade_start: true }) void init.upgrader.upgradeInbound(maConn) .then((conn) => { this.log('inbound connection %s upgraded', maConn.remoteAddr) + metrics?.increment({ upgrade_success: true }) if (init?.handler != null) { init?.handler(conn) @@ -62,11 +68,13 @@ class WebSocketListener extends TypedEventEmitter implements Lis }) .catch(async err => { this.log.error('inbound connection failed to upgrade', err) + metrics?.increment({ upgrade_error: true }) await maConn.close().catch(err => { this.log.error('inbound connection failed to close after upgrade failed', err) }) }) + metrics?.increment({ socket_open_success: true }) } catch (err) { this.log.error('inbound connection failed to upgrade', err) maConn.close().catch(err => { diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 1cb3278ca7..2255bcf2d1 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -1,18 +1,22 @@ import { CodeError } from '@libp2p/interface' import { CLOSE_TIMEOUT } from './constants.js' -import type { AbortOptions, ComponentLogger, MultiaddrConnection } from '@libp2p/interface' +import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { DuplexWebSocket } from 'it-ws/duplex' export interface SocketToConnOptions { localAddr?: Multiaddr logger: ComponentLogger + metrics?: CounterGroup } // Convert a stream into a MultiaddrConnection // https://github.com/libp2p/interface-transport#multiaddrconnection export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, options: SocketToConnOptions): MultiaddrConnection { const log = options.logger.forComponent('libp2p:websockets:maconn') + const metrics = options.metrics + + metrics?.increment({ maconn_open_start: true }) const maConn: MultiaddrConnection = { log, @@ -30,8 +34,10 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, })()) } catch (err: any) { if (err.type !== 'aborted') { + metrics?.increment({ maconn_sink_error: true }) log.error(err) } + metrics?.increment({ maconn_sink_abort: true }) } }, @@ -43,6 +49,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, async close (options: AbortOptions = {}) { const start = Date.now() + metrics?.increment({ maconn_close_start: true }) if (options.signal == null) { const signal = AbortSignal.timeout(CLOSE_TIMEOUT) @@ -57,6 +64,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, const { host, port } = maConn.remoteAddr.toOptions() log('timeout closing stream to %s:%s after %dms, destroying it manually', host, port, Date.now() - start) + metrics?.increment({ maconn_close_abort: true }) this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT')) } @@ -65,8 +73,10 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, try { await stream.close() + metrics?.increment({ maconn_close_success: true }) } catch (err: any) { log.error('error closing WebSocket gracefully', err) + metrics?.increment({ maconn_close_error: true }) this.abort(err) } finally { options.signal?.removeEventListener('abort', listener) @@ -79,6 +89,8 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, log('timeout closing stream to %s:%s due to error', host, port, err) + metrics?.increment({ maconn_abort: true }) + stream.destroy() maConn.timeline.close = Date.now() } @@ -91,7 +103,10 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, if (maConn.timeline.close == null) { maConn.timeline.close = Date.now() } + metrics?.increment({ socket_close_success: true }) }, { once: true }) + metrics?.increment({ maconn_open_success: true }) + return maConn } From 05bb4dd5e711dbce28eb5c57daba2515bfec5a5a Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Tue, 13 Aug 2024 14:04:03 -0500 Subject: [PATCH 2/9] chore: remove unnecessary 'start' events remove: * upgrade_start * socket_open_start * maconn_open_start * maconn_close_start add: * socket_open_error --- packages/transport-websockets/src/index.ts | 4 ++-- packages/transport-websockets/src/listener.ts | 2 -- packages/transport-websockets/src/socket-to-conn.ts | 3 --- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 5400b47b7b..8543f4bd94 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -140,7 +140,6 @@ class WebSockets implements Transport { }) this.log('new outbound connection %s', maConn.remoteAddr) - this.metrics?.dialerEvents.increment({ upgrade_start: true }) const conn = await options.upgrader.upgradeOutbound(maConn, options) this.metrics?.dialerEvents.increment({ upgrade_success: true }) this.log('outbound connection %s upgraded', maConn.remoteAddr) @@ -167,11 +166,12 @@ class WebSockets implements Transport { try { options.onProgress?.(new CustomProgressEvent('websockets:open-connection')) - this.metrics?.dialerEvents.increment({ socket_open_start: true }) await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal) } catch (err: any) { if (options.signal?.aborted === true) { this.metrics?.dialerEvents.increment({ socket_open_abort: true }) + } else { + this.metrics?.dialerEvents.increment({ socket_open_error: true }) } rawSocket.close() .catch(err => { diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 5941ef404b..2f395443c6 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -38,7 +38,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis this.server = createServer({ ...init, onConnection: (stream: DuplexWebSocket) => { - metrics?.increment({ socket_open_start: true }) const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { logger: components.logger, metrics @@ -52,7 +51,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis }) try { - metrics?.increment({ upgrade_start: true }) void init.upgrader.upgradeInbound(maConn) .then((conn) => { this.log('inbound connection %s upgraded', maConn.remoteAddr) diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 2255bcf2d1..64b4478fa6 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -16,8 +16,6 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, const log = options.logger.forComponent('libp2p:websockets:maconn') const metrics = options.metrics - metrics?.increment({ maconn_open_start: true }) - const maConn: MultiaddrConnection = { log, @@ -49,7 +47,6 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, async close (options: AbortOptions = {}) { const start = Date.now() - metrics?.increment({ maconn_close_start: true }) if (options.signal == null) { const signal = AbortSignal.timeout(CLOSE_TIMEOUT) From 24341d1a54692a954c3b50e3d2ae0b5b55a1aded Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Tue, 13 Aug 2024 14:46:51 -0500 Subject: [PATCH 3/9] feat: add metricPrefix for WS listener added metricPrefix to `socket-to-conn.ts` and set in `listener.ts` remove: * maconn_abort - would have resulted in duplicate of `maconn_close_abort` * maconn_open_success - No logic actually happens during maconn creation.. should be build/compile/run error and not a metric. add: N/A change: * socket_close_success -> maconn_socket_close_success --- packages/transport-websockets/src/listener.ts | 5 ++++- .../transport-websockets/src/socket-to-conn.ts | 18 ++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 2f395443c6..e5ad1fe283 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -38,9 +38,12 @@ class WebSocketListener extends TypedEventEmitter implements Lis this.server = createServer({ ...init, onConnection: (stream: DuplexWebSocket) => { + const listeningAddrDetails = this.listeningMultiaddr?.toOptions() + const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { logger: components.logger, - metrics + metrics, + metricPrefix: `${listeningAddrDetails?.transport}_${listeningAddrDetails?.host}:${listeningAddrDetails?.port} ` }) this.log('new inbound connection %s', maConn.remoteAddr) diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 64b4478fa6..877b2751b2 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -8,6 +8,7 @@ export interface SocketToConnOptions { localAddr?: Multiaddr logger: ComponentLogger metrics?: CounterGroup + metricPrefix?: string } // Convert a stream into a MultiaddrConnection @@ -15,6 +16,7 @@ export interface SocketToConnOptions { export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, options: SocketToConnOptions): MultiaddrConnection { const log = options.logger.forComponent('libp2p:websockets:maconn') const metrics = options.metrics + const metricPrefix = options.metricPrefix ?? '' const maConn: MultiaddrConnection = { log, @@ -32,10 +34,10 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, })()) } catch (err: any) { if (err.type !== 'aborted') { - metrics?.increment({ maconn_sink_error: true }) + metrics?.increment({ [`${metricPrefix}maconn_sink_error`]: true }) log.error(err) } - metrics?.increment({ maconn_sink_abort: true }) + metrics?.increment({ [`${metricPrefix}maconn_sink_abort`]: true }) } }, @@ -61,7 +63,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, const { host, port } = maConn.remoteAddr.toOptions() log('timeout closing stream to %s:%s after %dms, destroying it manually', host, port, Date.now() - start) - metrics?.increment({ maconn_close_abort: true }) + metrics?.increment({ [`${metricPrefix}maconn_close_abort`]: true }) this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT')) } @@ -70,10 +72,10 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, try { await stream.close() - metrics?.increment({ maconn_close_success: true }) + metrics?.increment({ [`${metricPrefix}maconn_close_success`]: true }) } catch (err: any) { log.error('error closing WebSocket gracefully', err) - metrics?.increment({ maconn_close_error: true }) + metrics?.increment({ [`${metricPrefix}maconn_close_error`]: true }) this.abort(err) } finally { options.signal?.removeEventListener('abort', listener) @@ -86,8 +88,6 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, log('timeout closing stream to %s:%s due to error', host, port, err) - metrics?.increment({ maconn_abort: true }) - stream.destroy() maConn.timeline.close = Date.now() } @@ -100,10 +100,8 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, if (maConn.timeline.close == null) { maConn.timeline.close = Date.now() } - metrics?.increment({ socket_close_success: true }) + metrics?.increment({ [`${metricPrefix}maconn_socket_close_success`]: true }) }, { once: true }) - metrics?.increment({ maconn_open_success: true }) - return maConn } From ba44f7a9141e655923519b4256c9b22373ba198c Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Wed, 14 Aug 2024 13:10:12 -0500 Subject: [PATCH 4/9] chore: remove unused property --- packages/transport-websockets/src/index.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 8543f4bd94..8c7725a945 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -88,7 +88,6 @@ export interface WebSocketsComponents { export interface WebSocketsMetrics { dialerEvents: CounterGroup listenerEvents: CounterGroup - component: Metrics } export type WebSocketsDialEvents = @@ -108,7 +107,6 @@ class WebSockets implements Transport { if (components.metrics != null) { this.metrics = { - component: components.metrics, dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', { label: 'event', help: 'Total count of WebSockets dialer events by type' From b69a8616c285a1c8e45acf633463cb223718a8a7 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Wed, 14 Aug 2024 14:40:22 -0500 Subject: [PATCH 5/9] fix: align more closely to tcp metrics --- packages/transport-websockets/src/index.ts | 19 ++-- packages/transport-websockets/src/listener.ts | 87 ++++++++++++++++--- .../src/socket-to-conn.ts | 14 +-- 3 files changed, 92 insertions(+), 28 deletions(-) diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 8c7725a945..804333e871 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -87,7 +87,6 @@ export interface WebSocketsComponents { export interface WebSocketsMetrics { dialerEvents: CounterGroup - listenerEvents: CounterGroup } export type WebSocketsDialEvents = @@ -99,10 +98,12 @@ class WebSockets implements Transport { private readonly init?: WebSocketsInit private readonly logger: ComponentLogger private readonly metrics?: WebSocketsMetrics + private readonly components: WebSocketsComponents constructor (components: WebSocketsComponents, init?: WebSocketsInit) { this.log = components.logger.forComponent('libp2p:websockets') this.logger = components.logger + this.components = components this.init = init if (components.metrics != null) { @@ -110,10 +111,6 @@ class WebSockets implements Transport { dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', { label: 'event', help: 'Total count of WebSockets dialer events by type' - }), - listenerEvents: components.metrics.registerCounterGroup('libp2p_websockets_listener_events_total', { - label: 'event', - help: 'Total count of WebSockets listener events by type' }) } } @@ -139,7 +136,6 @@ class WebSockets implements Transport { this.log('new outbound connection %s', maConn.remoteAddr) const conn = await options.upgrader.upgradeOutbound(maConn, options) - this.metrics?.dialerEvents.increment({ upgrade_success: true }) this.log('outbound connection %s upgraded', maConn.remoteAddr) return conn } @@ -158,7 +154,7 @@ class WebSockets implements Transport { // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED') this.log.error('connection error:', err) - this.metrics?.dialerEvents.increment({ socket_open_error: true }) + this.metrics?.dialerEvents.increment({ error: true }) errorPromise.reject(err) }) @@ -167,21 +163,20 @@ class WebSockets implements Transport { await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal) } catch (err: any) { if (options.signal?.aborted === true) { - this.metrics?.dialerEvents.increment({ socket_open_abort: true }) + this.metrics?.dialerEvents.increment({ abort: true }) } else { - this.metrics?.dialerEvents.increment({ socket_open_error: true }) + this.metrics?.dialerEvents.increment({ error: true }) } rawSocket.close() .catch(err => { this.log.error('error closing raw socket', err) - this.metrics?.dialerEvents.increment({ socket_close_error: true }) }) throw err } this.log('connected %s', ma) - this.metrics?.dialerEvents.increment({ socket_open_success: true }) + this.metrics?.dialerEvents.increment({ connect: true }) return rawSocket } @@ -193,7 +188,7 @@ class WebSockets implements Transport { createListener (options: CreateListenerOptions): Listener { return createListener({ logger: this.logger, - metrics: this.metrics?.listenerEvents + metrics: this.components.metrics }, { ...this.init, ...options diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index e5ad1fe283..192dbc63b3 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -4,7 +4,7 @@ import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multi import { multiaddr, protocols } from '@multiformats/multiaddr' import { createServer } from 'it-ws/server' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup } from '@libp2p/interface' +import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' @@ -12,18 +12,41 @@ import type { WebSocketServer } from 'it-ws/server' export interface WebSocketListenerComponents { logger: ComponentLogger - metrics?: CounterGroup + metrics?: Metrics } export interface WebSocketListenerInit extends CreateListenerOptions { server?: Server } +export interface WebSocketListenerMetrics { + status: MetricGroup + errors: CounterGroup + events: CounterGroup +} + +enum WebSocketListenerStatusCode { + /** + * When server object is initialized but we don't know the listening address + * yet or the server object is stopped manually, can be resumed only by + * calling listen() + **/ + INACTIVE = 0, + ACTIVE = 1 +} + +type Status = { code: WebSocketListenerStatusCode.INACTIVE } | { + code: Exclude +} + class WebSocketListener extends TypedEventEmitter implements Listener { private readonly connections: Set private listeningMultiaddr?: Multiaddr private readonly server: WebSocketServer private readonly log: Logger + private metrics?: WebSocketListenerMetrics + private addr: string + private status: Status = { code: WebSocketListenerStatusCode.INACTIVE } constructor (components: WebSocketListenerComponents, init: WebSocketListenerInit) { super() @@ -35,15 +58,15 @@ class WebSocketListener extends TypedEventEmitter implements Lis const self = this // eslint-disable-line @typescript-eslint/no-this-alias + this.addr = 'unknown' + this.server = createServer({ ...init, onConnection: (stream: DuplexWebSocket) => { - const listeningAddrDetails = this.listeningMultiaddr?.toOptions() - const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { logger: components.logger, - metrics, - metricPrefix: `${listeningAddrDetails?.transport}_${listeningAddrDetails?.host}:${listeningAddrDetails?.port} ` + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` }) this.log('new inbound connection %s', maConn.remoteAddr) @@ -57,7 +80,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis void init.upgrader.upgradeInbound(maConn) .then((conn) => { this.log('inbound connection %s upgraded', maConn.remoteAddr) - metrics?.increment({ upgrade_success: true }) if (init?.handler != null) { init?.handler(conn) @@ -69,31 +91,68 @@ class WebSocketListener extends TypedEventEmitter implements Lis }) .catch(async err => { this.log.error('inbound connection failed to upgrade', err) - metrics?.increment({ upgrade_error: true }) + this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) await maConn.close().catch(err => { this.log.error('inbound connection failed to close after upgrade failed', err) }) }) - metrics?.increment({ socket_open_success: true }) } catch (err) { this.log.error('inbound connection failed to upgrade', err) maConn.close().catch(err => { this.log.error('inbound connection failed to close after upgrade failed', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true }) }) } } }) this.server.on('listening', () => { + if (metrics != null) { + const { host, port } = this.listeningMultiaddr?.toOptions() ?? {} + this.addr = `${host}:${port}` + + metrics.registerMetricGroup('libp2p_websockets_inbound_connections_total', { + label: 'address', + help: 'Current active connections in WebSocket listener', + calculate: () => { + return { + [this.addr]: this.connections.size + } + } + }) + + this.metrics = { + status: metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', { + label: 'address', + help: 'Current status of the WebSocket listener socket' + }), + errors: metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', { + label: 'address', + help: 'Total count of WebSocket listener errors by type' + }), + events: metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', { + label: 'address', + help: 'Total count of WebSocket listener events by type' + }) + } + + this.metrics?.status.update({ + [this.addr]: WebSocketListenerStatusCode.ACTIVE + }) + } this.dispatchEvent(new CustomEvent('listening')) }) this.server.on('error', (err: Error) => { + this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true }) this.dispatchEvent(new CustomEvent('error', { detail: err })) }) this.server.on('close', () => { + this.metrics?.status.update({ + [this.addr]: this.status.code + }) this.dispatchEvent(new CustomEvent('close')) }) } @@ -114,7 +173,15 @@ class WebSocketListener extends TypedEventEmitter implements Lis async listen (ma: Multiaddr): Promise { this.listeningMultiaddr = ma - await this.server.listen(ma.toOptions()) + try { + await this.server.listen(ma.toOptions()) + this.status = { + code: WebSocketListenerStatusCode.ACTIVE + } + } catch (err) { + this.status = { code: WebSocketListenerStatusCode.INACTIVE } + throw err + } } getAddrs (): Multiaddr[] { diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 877b2751b2..79e4b656f3 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -32,12 +32,14 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, } } })()) + metrics?.increment({ [`${metricPrefix}sink_success`]: true }) } catch (err: any) { if (err.type !== 'aborted') { - metrics?.increment({ [`${metricPrefix}maconn_sink_error`]: true }) log.error(err) + metrics?.increment({ [`${metricPrefix}sink_error`]: true }) + } else { + metrics?.increment({ [`${metricPrefix}sink_abort`]: true }) } - metrics?.increment({ [`${metricPrefix}maconn_sink_abort`]: true }) } }, @@ -63,7 +65,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, const { host, port } = maConn.remoteAddr.toOptions() log('timeout closing stream to %s:%s after %dms, destroying it manually', host, port, Date.now() - start) - metrics?.increment({ [`${metricPrefix}maconn_close_abort`]: true }) + metrics?.increment({ [`${metricPrefix}close_abort`]: true }) this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT')) } @@ -72,10 +74,10 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, try { await stream.close() - metrics?.increment({ [`${metricPrefix}maconn_close_success`]: true }) + metrics?.increment({ [`${metricPrefix}close_success`]: true }) } catch (err: any) { log.error('error closing WebSocket gracefully', err) - metrics?.increment({ [`${metricPrefix}maconn_close_error`]: true }) + metrics?.increment({ [`${metricPrefix}close_error`]: true }) this.abort(err) } finally { options.signal?.removeEventListener('abort', listener) @@ -94,13 +96,13 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, } stream.socket.addEventListener('close', () => { + metrics?.increment({ [`${metricPrefix}close`]: true }) // In instances where `close` was not explicitly called, // such as an iterable stream ending, ensure we have set the close // timeline if (maConn.timeline.close == null) { maConn.timeline.close = Date.now() } - metrics?.increment({ [`${metricPrefix}maconn_socket_close_success`]: true }) }, { once: true }) return maConn From 5c8871a2f22e30853b86f1cf98ed510958efd0a5 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Wed, 14 Aug 2024 15:01:31 -0500 Subject: [PATCH 6/9] chore: remove sink metrics --- packages/transport-websockets/src/socket-to-conn.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 79e4b656f3..781c4f3add 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -32,13 +32,9 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, } } })()) - metrics?.increment({ [`${metricPrefix}sink_success`]: true }) } catch (err: any) { if (err.type !== 'aborted') { log.error(err) - metrics?.increment({ [`${metricPrefix}sink_error`]: true }) - } else { - metrics?.increment({ [`${metricPrefix}sink_abort`]: true }) } } }, From fe808b04ca89cb48db48c51caf0793a4c879360b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 15 Aug 2024 08:22:22 +0100 Subject: [PATCH 7/9] chore: revert status --- packages/transport-websockets/src/listener.ts | 34 ++----------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 192dbc63b3..84905da253 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -1,5 +1,5 @@ import os from 'os' -import { TypedEventEmitter, CustomEvent } from '@libp2p/interface' +import { TypedEventEmitter } from '@libp2p/interface' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' import { multiaddr, protocols } from '@multiformats/multiaddr' import { createServer } from 'it-ws/server' @@ -25,20 +25,6 @@ export interface WebSocketListenerMetrics { events: CounterGroup } -enum WebSocketListenerStatusCode { - /** - * When server object is initialized but we don't know the listening address - * yet or the server object is stopped manually, can be resumed only by - * calling listen() - **/ - INACTIVE = 0, - ACTIVE = 1 -} - -type Status = { code: WebSocketListenerStatusCode.INACTIVE } | { - code: Exclude -} - class WebSocketListener extends TypedEventEmitter implements Listener { private readonly connections: Set private listeningMultiaddr?: Multiaddr @@ -46,7 +32,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis private readonly log: Logger private metrics?: WebSocketListenerMetrics private addr: string - private status: Status = { code: WebSocketListenerStatusCode.INACTIVE } constructor (components: WebSocketListenerComponents, init: WebSocketListenerInit) { super() @@ -136,10 +121,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis help: 'Total count of WebSocket listener events by type' }) } - - this.metrics?.status.update({ - [this.addr]: WebSocketListenerStatusCode.ACTIVE - }) } this.dispatchEvent(new CustomEvent('listening')) }) @@ -150,9 +131,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis })) }) this.server.on('close', () => { - this.metrics?.status.update({ - [this.addr]: this.status.code - }) this.dispatchEvent(new CustomEvent('close')) }) } @@ -173,15 +151,7 @@ class WebSocketListener extends TypedEventEmitter implements Lis async listen (ma: Multiaddr): Promise { this.listeningMultiaddr = ma - try { - await this.server.listen(ma.toOptions()) - this.status = { - code: WebSocketListenerStatusCode.ACTIVE - } - } catch (err) { - this.status = { code: WebSocketListenerStatusCode.INACTIVE } - throw err - } + await this.server.listen(ma.toOptions()) } getAddrs (): Multiaddr[] { From ec974b3195c6f6651d5f33488f1c02021d7f587b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 15 Aug 2024 09:14:29 +0100 Subject: [PATCH 8/9] chore: log on event --- packages/transport-websockets/src/socket-to-conn.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 781c4f3add..eb98160c65 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -61,7 +61,6 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, const { host, port } = maConn.remoteAddr.toOptions() log('timeout closing stream to %s:%s after %dms, destroying it manually', host, port, Date.now() - start) - metrics?.increment({ [`${metricPrefix}close_abort`]: true }) this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT')) } @@ -70,10 +69,8 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, try { await stream.close() - metrics?.increment({ [`${metricPrefix}close_success`]: true }) } catch (err: any) { log.error('error closing WebSocket gracefully', err) - metrics?.increment({ [`${metricPrefix}close_error`]: true }) this.abort(err) } finally { options.signal?.removeEventListener('abort', listener) @@ -88,11 +85,18 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, stream.destroy() maConn.timeline.close = Date.now() + + // ws WebSocket.terminate does not accept an Error arg to emit an 'error' + // event on destroy like other node streams so we can't update a metric + // with an event listener + // https://github.com/websockets/ws/issues/1752#issuecomment-622380981 + metrics?.increment({ [`${metricPrefix}error`]: true }) } } stream.socket.addEventListener('close', () => { metrics?.increment({ [`${metricPrefix}close`]: true }) + // In instances where `close` was not explicitly called, // such as an iterable stream ending, ensure we have set the close // timeline From 8524f803e303a73c08eac9c0a9f614ab93a2356a Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 15 Aug 2024 09:22:45 +0100 Subject: [PATCH 9/9] chore: do not double-track error --- packages/transport-websockets/src/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 804333e871..18cf573c9a 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -164,9 +164,8 @@ class WebSockets implements Transport { } catch (err: any) { if (options.signal?.aborted === true) { this.metrics?.dialerEvents.increment({ abort: true }) - } else { - this.metrics?.dialerEvents.increment({ error: true }) } + rawSocket.close() .catch(err => { this.log.error('error closing raw socket', err)