diff --git a/src/encode.ts b/src/encode.ts index 17564e1..f2c726a 100644 --- a/src/encode.ts +++ b/src/encode.ts @@ -1,5 +1,6 @@ import type { Source } from 'it-stream-types' import varint from 'varint' +import { Uint8ArrayList } from 'uint8arraylist' import { allocUnsafe } from './alloc-unsafe.js' import { Message, MessageTypes } from './message-types.js' @@ -15,9 +16,9 @@ class Encoder { } /** - * Encodes the given message and returns it and its header + * Encodes the given message and adds it to the passed list */ - write (msg: Message): Uint8Array[] { + write (msg: Message, list: Uint8ArrayList): void { const pool = this._pool let offset = this._poolOffset @@ -41,16 +42,11 @@ class Encoder { this._poolOffset = offset } + list.append(header) + if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) { - return [ - header, - ...(msg.data instanceof Uint8Array ? [msg.data] : msg.data) - ] + list.append(msg.data) } - - return [ - header - ] } } @@ -61,12 +57,16 @@ const encoder = new Encoder() */ export async function * encode (source: Source) { for await (const msg of source) { + const list = new Uint8ArrayList() + if (Array.isArray(msg)) { for (const m of msg) { - yield * encoder.write(m) + encoder.write(m, list) } } else { - yield * encoder.write(msg) + encoder.write(msg, list) } + + yield list.subarray() } } diff --git a/src/mplex.ts b/src/mplex.ts index 945a64f..1e79cce 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -155,7 +155,7 @@ export class MplexStreamMuxer implements StreamMuxer { _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { const { id, name, type, registry } = options - log('new %s stream %s %s', type, id) + log('new %s stream %s', type, id) if (type === 'initiator' && this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) { throw errCode(new Error('Too many outbound streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS') diff --git a/src/stream.ts b/src/stream.ts index 1fd9e5f..20ab83a 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -171,24 +171,15 @@ export function createStream (options: Options): MplexStream { send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) }) } - const uint8ArrayList = new Uint8ArrayList() - - for await (const data of source) { - if (data.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: data instanceof Uint8ArrayList ? data : new Uint8ArrayList(data) }) - } else { - uint8ArrayList.append(data) - - while (uint8ArrayList.length !== 0) { - // eslint-disable-next-line max-depth - if (uint8ArrayList.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist() }) - uint8ArrayList.consume(uint8ArrayList.length) - break - } - send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist(0, maxMsgSize) }) - uint8ArrayList.consume(maxMsgSize) + for await (let data of source) { + while (data.length > 0) { + if (data.length <= maxMsgSize) { + send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data }) + break } + data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data + send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) }) + data.consume(maxMsgSize) } } } catch (err: any) {