From 31d3938f8fcdf56debbf8824ccbcbc057d5bd5be Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 24 Nov 2022 09:53:17 +0000 Subject: [PATCH] fix: yield single buffers (#233) * fix: yield single buffers Messages are serialized to multiple buffers, intsead of yield each buffer one by one, create single buffers that contain the whole serialized message. This greatly improves transport performance as writing one big buffer is a lot faster than writing lots of small buffers to network sockets etc. Before: ``` testing 0.40.x-mplex sender 3276811 messages 17 invocations sender 6553636 bufs 17 b 24197 ms 105 MB in 32 B chunks in 24170ms ``` After: ``` testing 0.40.x-mplex sender 3276811 messages 1638408 invocations 1638411 bufs 68 b 8626 ms 105 MB in 32 B chunks in 8611ms ``` * chore: update comment --- src/encode.ts | 24 ++++++++++++------------ src/mplex.ts | 2 +- src/stream.ts | 25 ++++++++----------------- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/encode.ts b/src/encode.ts index 17564e1..f2c726a 100644 --- a/src/encode.ts +++ b/src/encode.ts @@ -1,5 +1,6 @@ import type { Source } from 'it-stream-types' import varint from 'varint' +import { Uint8ArrayList } from 'uint8arraylist' import { allocUnsafe } from './alloc-unsafe.js' import { Message, MessageTypes } from './message-types.js' @@ -15,9 +16,9 @@ class Encoder { } /** - * Encodes the given message and returns it and its header + * Encodes the given message and adds it to the passed list */ - write (msg: Message): Uint8Array[] { + write (msg: Message, list: Uint8ArrayList): void { const pool = this._pool let offset = this._poolOffset @@ -41,16 +42,11 @@ class Encoder { this._poolOffset = offset } + list.append(header) + if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) { - return [ - header, - ...(msg.data instanceof Uint8Array ? [msg.data] : msg.data) - ] + list.append(msg.data) } - - return [ - header - ] } } @@ -61,12 +57,16 @@ const encoder = new Encoder() */ export async function * encode (source: Source) { for await (const msg of source) { + const list = new Uint8ArrayList() + if (Array.isArray(msg)) { for (const m of msg) { - yield * encoder.write(m) + encoder.write(m, list) } } else { - yield * encoder.write(msg) + encoder.write(msg, list) } + + yield list.subarray() } } diff --git a/src/mplex.ts b/src/mplex.ts index 945a64f..1e79cce 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -155,7 +155,7 @@ export class MplexStreamMuxer implements StreamMuxer { _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { const { id, name, type, registry } = options - log('new %s stream %s %s', type, id) + log('new %s stream %s', type, id) if (type === 'initiator' && this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) { throw errCode(new Error('Too many outbound streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS') diff --git a/src/stream.ts b/src/stream.ts index 1fd9e5f..20ab83a 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -171,24 +171,15 @@ export function createStream (options: Options): MplexStream { send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) }) } - const uint8ArrayList = new Uint8ArrayList() - - for await (const data of source) { - if (data.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: data instanceof Uint8ArrayList ? data : new Uint8ArrayList(data) }) - } else { - uint8ArrayList.append(data) - - while (uint8ArrayList.length !== 0) { - // eslint-disable-next-line max-depth - if (uint8ArrayList.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist() }) - uint8ArrayList.consume(uint8ArrayList.length) - break - } - send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist(0, maxMsgSize) }) - uint8ArrayList.consume(maxMsgSize) + for await (let data of source) { + while (data.length > 0) { + if (data.length <= maxMsgSize) { + send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data }) + break } + data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data + send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) }) + data.consume(maxMsgSize) } } } catch (err: any) {