Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WebSockets metrics #2649

Merged
merged 12 commits into from
Aug 15, 2024
39 changes: 36 additions & 3 deletions packages/transport-websockets/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import * as filters from './filters.js'
import { createListener } from './listener.js'
import { socketToMaConn } from './socket-to-conn.js'
import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents } from '@libp2p/interface'
import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
Expand All @@ -82,6 +82,13 @@

export interface WebSocketsComponents {
logger: ComponentLogger
metrics?: Metrics
}

export interface WebSocketsMetrics {
dialerEvents: CounterGroup
listenerEvents: CounterGroup
component: Metrics
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
}

export type WebSocketsDialEvents =
Expand All @@ -92,11 +99,26 @@
private readonly log: Logger
private readonly init?: WebSocketsInit
private readonly logger: ComponentLogger
private readonly metrics?: WebSocketsMetrics

constructor (components: WebSocketsComponents, init?: WebSocketsInit) {
this.log = components.logger.forComponent('libp2p:websockets')
this.logger = components.logger
this.init = init

if (components.metrics != null) {
this.metrics = {
component: components.metrics,
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', {
label: 'event',
help: 'Total count of WebSockets dialer events by type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_websockets_listener_events_total', {
label: 'event',
help: 'Total count of WebSockets listener events by type'
})
}
}

Check warning on line 121 in packages/transport-websockets/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/index.ts#L110-L121

Added lines #L110 - L121 were not covered by tests
}

readonly [transportSymbol] = true
Expand All @@ -113,11 +135,14 @@

const socket = await this._connect(ma, options)
const maConn = socketToMaConn(socket, ma, {
logger: this.logger
logger: this.logger,
metrics: this.metrics?.dialerEvents
})
this.log('new outbound connection %s', maConn.remoteAddr)

this.metrics?.dialerEvents.increment({ upgrade_start: true })
const conn = await options.upgrader.upgradeOutbound(maConn, options)
this.metrics?.dialerEvents.increment({ upgrade_success: true })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need an upgrade_error somewhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense.

this.log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
Expand All @@ -136,22 +161,29 @@
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event
const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED')
this.log.error('connection error:', err)
this.metrics?.dialerEvents.increment({ socket_open_error: true })
errorPromise.reject(err)
})

try {
options.onProgress?.(new CustomProgressEvent('websockets:open-connection'))
this.metrics?.dialerEvents.increment({ socket_open_start: true })
await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal)
} catch (err: any) {
if (options.signal?.aborted === true) {
this.metrics?.dialerEvents.increment({ socket_open_abort: true })
}
rawSocket.close()
.catch(err => {
this.log.error('error closing raw socket', err)
this.metrics?.dialerEvents.increment({ socket_close_error: true })

Check warning on line 179 in packages/transport-websockets/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/index.ts#L179

Added line #L179 was not covered by tests
})

throw err
}

this.log('connected %s', ma)
this.metrics?.dialerEvents.increment({ socket_open_success: true })
return rawSocket
}

Expand All @@ -162,7 +194,8 @@
*/
createListener (options: CreateListenerOptions): Listener {
return createListener({
logger: this.logger
logger: this.logger,
metrics: this.metrics?.listenerEvents
}, {
...this.init,
...options
Expand Down
12 changes: 10 additions & 2 deletions packages/transport-websockets/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { createServer } from 'it-ws/server'
import { socketToMaConn } from './socket-to-conn.js'
import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
import type { WebSocketServer } from 'it-ws/server'

export interface WebSocketListenerComponents {
logger: ComponentLogger
metrics?: CounterGroup
}

export interface WebSocketListenerInit extends CreateListenerOptions {
Expand All @@ -28,6 +29,7 @@
super()

this.log = components.logger.forComponent('libp2p:websockets:listener')
const metrics = components.metrics
// Keep track of open connections to destroy when the listener is closed
this.connections = new Set<DuplexWebSocket>()

Expand All @@ -36,8 +38,10 @@
this.server = createServer({
...init,
onConnection: (stream: DuplexWebSocket) => {
metrics?.increment({ socket_open_start: true })
const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), {
logger: components.logger
logger: components.logger,
metrics
})
this.log('new inbound connection %s', maConn.remoteAddr)

Expand All @@ -48,9 +52,11 @@
})

try {
metrics?.increment({ upgrade_start: true })
void init.upgrader.upgradeInbound(maConn)
.then((conn) => {
this.log('inbound connection %s upgraded', maConn.remoteAddr)
metrics?.increment({ upgrade_success: true })
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

if (init?.handler != null) {
init?.handler(conn)
Expand All @@ -62,11 +68,13 @@
})
.catch(async err => {
this.log.error('inbound connection failed to upgrade', err)
metrics?.increment({ upgrade_error: true })

Check warning on line 71 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L71

Added line #L71 was not covered by tests

await maConn.close().catch(err => {
this.log.error('inbound connection failed to close after upgrade failed', err)
})
})
metrics?.increment({ socket_open_success: true })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could fire after the .catch handler. should remove

} catch (err) {
this.log.error('inbound connection failed to upgrade', err)
maConn.close().catch(err => {
Expand Down
17 changes: 16 additions & 1 deletion packages/transport-websockets/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import { CodeError } from '@libp2p/interface'
import { CLOSE_TIMEOUT } from './constants.js'
import type { AbortOptions, ComponentLogger, MultiaddrConnection } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DuplexWebSocket } from 'it-ws/duplex'

export interface SocketToConnOptions {
localAddr?: Multiaddr
logger: ComponentLogger
metrics?: CounterGroup
}

// Convert a stream into a MultiaddrConnection
// https:/libp2p/interface-transport#multiaddrconnection
export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, options: SocketToConnOptions): MultiaddrConnection {
const log = options.logger.forComponent('libp2p:websockets:maconn')
const metrics = options.metrics

metrics?.increment({ maconn_open_start: true })

const maConn: MultiaddrConnection = {
log,
Expand All @@ -30,8 +34,10 @@
})())
} catch (err: any) {
if (err.type !== 'aborted') {
metrics?.increment({ maconn_sink_error: true })

Check warning on line 37 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L37

Added line #L37 was not covered by tests
log.error(err)
}
metrics?.increment({ maconn_sink_abort: true })

Check warning on line 40 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L40

Added line #L40 was not covered by tests
}
},

Expand All @@ -43,6 +49,7 @@

async close (options: AbortOptions = {}) {
const start = Date.now()
metrics?.increment({ maconn_close_start: true })

if (options.signal == null) {
const signal = AbortSignal.timeout(CLOSE_TIMEOUT)
Expand All @@ -57,6 +64,7 @@
const { host, port } = maConn.remoteAddr.toOptions()
log('timeout closing stream to %s:%s after %dms, destroying it manually',
host, port, Date.now() - start)
metrics?.increment({ maconn_close_abort: true })

Check warning on line 67 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L67

Added line #L67 was not covered by tests

this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT'))
}
Expand All @@ -65,8 +73,10 @@

try {
await stream.close()
metrics?.increment({ maconn_close_success: true })
} catch (err: any) {
log.error('error closing WebSocket gracefully', err)
metrics?.increment({ maconn_close_error: true })

Check warning on line 79 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L79

Added line #L79 was not covered by tests
this.abort(err)
} finally {
options.signal?.removeEventListener('abort', listener)
Expand All @@ -79,6 +89,8 @@
log('timeout closing stream to %s:%s due to error',
host, port, err)

metrics?.increment({ maconn_abort: true })

Check warning on line 93 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L92-L93

Added lines #L92 - L93 were not covered by tests
stream.destroy()
maConn.timeline.close = Date.now()
}
Expand All @@ -91,7 +103,10 @@
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
metrics?.increment({ socket_close_success: true })
}, { once: true })

metrics?.increment({ maconn_open_success: true })

return maConn
}
Loading