Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: yield single buffers (#233)
Browse files Browse the repository at this point in the history
* fix: yield single buffers

Messages are serialized to multiple buffers, intsead of yield each
buffer one by one, create single buffers that contain the whole
serialized message. This greatly improves transport performance as
writing one big buffer is a lot faster than writing lots of small
buffers to network sockets etc.

Before:

```
testing 0.40.x-mplex
sender 3276811 messages 17 invocations
sender 6553636 bufs 17 b 24197 ms
105 MB in 32 B chunks in 24170ms
```

After:

```
testing 0.40.x-mplex
sender 3276811 messages 1638408 invocations
1638411 bufs 68 b 8626 ms
105 MB in 32 B chunks in 8611ms
```

* chore: update comment
  • Loading branch information
achingbrain authored Nov 24, 2022
1 parent 279ad47 commit 31d3938
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 30 deletions.
24 changes: 12 additions & 12 deletions src/encode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Source } from 'it-stream-types'
import varint from 'varint'
import { Uint8ArrayList } from 'uint8arraylist'
import { allocUnsafe } from './alloc-unsafe.js'
import { Message, MessageTypes } from './message-types.js'

Expand All @@ -15,9 +16,9 @@ class Encoder {
}

/**
* Encodes the given message and returns it and its header
* Encodes the given message and adds it to the passed list
*/
write (msg: Message): Uint8Array[] {
write (msg: Message, list: Uint8ArrayList): void {
const pool = this._pool
let offset = this._poolOffset

Expand All @@ -41,16 +42,11 @@ class Encoder {
this._poolOffset = offset
}

list.append(header)

if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) {
return [
header,
...(msg.data instanceof Uint8Array ? [msg.data] : msg.data)
]
list.append(msg.data)
}

return [
header
]
}
}

Expand All @@ -61,12 +57,16 @@ const encoder = new Encoder()
*/
export async function * encode (source: Source<Message | Message[]>) {
for await (const msg of source) {
const list = new Uint8ArrayList()

if (Array.isArray(msg)) {
for (const m of msg) {
yield * encoder.write(m)
encoder.write(m, list)
}
} else {
yield * encoder.write(msg)
encoder.write(msg, list)
}

yield list.subarray()
}
}
2 changes: 1 addition & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export class MplexStreamMuxer implements StreamMuxer {
_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
const { id, name, type, registry } = options

log('new %s stream %s %s', type, id)
log('new %s stream %s', type, id)

if (type === 'initiator' && this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) {
throw errCode(new Error('Too many outbound streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS')
Expand Down
25 changes: 8 additions & 17 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,15 @@ export function createStream (options: Options): MplexStream {
send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) })
}

const uint8ArrayList = new Uint8ArrayList()

for await (const data of source) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data: data instanceof Uint8ArrayList ? data : new Uint8ArrayList(data) })
} else {
uint8ArrayList.append(data)

while (uint8ArrayList.length !== 0) {
// eslint-disable-next-line max-depth
if (uint8ArrayList.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist() })
uint8ArrayList.consume(uint8ArrayList.length)
break
}
send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist(0, maxMsgSize) })
uint8ArrayList.consume(maxMsgSize)
for await (let data of source) {
while (data.length > 0) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data })
break
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err: any) {
Expand Down

0 comments on commit 31d3938

Please sign in to comment.