Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to sync zlib with 512k chunks, adjustable compression level #174

Merged
merged 2 commits into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ Returns a `Client` instance and connects to the server.
| skipPing | *optional* | Whether pinging the server to check its version should be skipped. |
| conLog | *optional* | Where to log connection information (server join, kick messages to). Defaults to console.log, set to `null` to not log anywhere. |
| useNativeRaknet | *optional* | Whether to use the C++ version of RakNet. Set to false to use JS. |
| authTitle | *optional* | The client ID to sign in as, defaults to Minecraft for Nintendo Switch. Set false to sign in through Azure. See prismarine-auth |
| deviceType | *optional* | The device type to sign in as, defaults to "Nintendo". See prismarine-auth |
| compressionLevel | *optional* | What zlib compression level to use, default to **7** |
| batchingInterval | *optional* | How frequently, in milliseconds to flush and write the packet queue (default: 20ms) |

The following events are emitted by the client:
* 'status' - When the client's login sequence status has changed
Expand All @@ -30,6 +30,8 @@ The following events are emitted by the client:
* 'kick' - The server has kicked the client
* 'close' - The server has closed the connection
* 'error' - An recoverable exception has happened. Not catching will throw an exception
* 'connect_allowed' - Emitted after the client has pinged the server and gets version information.
* 'heartbeat' - Emitted after two successful tick_sync (keepalive) packets have been sent bidirectionally

## be.createServer(options) : Server

Expand Down
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ declare module "bedrock-protocol" {
useNativeRaknet?: boolean,
// If using JS implementation of RakNet, should we use workers? (This only affects the client)
useRaknetWorker?: boolean
// Compression level for zlib, default to 7
compressionLevel?: number
// How frequently the packet queue should be flushed in milliseconds, defaults to 20ms
batchingInterval?: number
}

export interface ClientOptions extends Options {
Expand Down
8 changes: 7 additions & 1 deletion src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Client extends Connection {
if (this.options.protocolVersion < Options.MIN_VERSION) {
throw new Error(`Protocol version < ${Options.MIN_VERSION} : ${this.options.protocolVersion}, too old`)
}
this.compressionLevel = this.options.compressionLevel || 7
}

get entityId () {
Expand Down Expand Up @@ -140,7 +141,12 @@ class Client extends Connection {
if (this.status === ClientStatus.Initializing && this.options.autoInitPlayer === true) {
if (statusPacket.status === 'player_spawn') {
this.status = ClientStatus.Initialized
this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId })
if (!this.entityId) {
// We need to wait for start_game in the rare event we get a player_spawn before start_game race condition
this.on('start_game', () => this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId }))
} else {
this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId })
}
this.emit('spawn')
}
}
Expand Down
45 changes: 24 additions & 21 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Connection extends EventEmitter {
write (name, params) {
this.outLog?.(name, params)
if (name === 'start_game') this.updateItemPalette(params.itemstates)
const batch = new Framer()
const batch = new Framer(this.compressionLevel)
const packet = this.serializer.createPacketBuffer({ name, params })
batch.addEncodedPacket(packet)

Expand All @@ -84,29 +84,33 @@ class Connection extends EventEmitter {
this.sendIds.push(name)
}

_tick () {
if (this.sendQ.length) {
const batch = new Framer(this.compressionLevel)
batch.addEncodedPackets(this.sendQ)
this.sendQ = []
this.sendIds = []
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
} else {
this.sendDecryptedBatch(batch)
}
}
}

onTick = this._tick.bind(this)

startQueue () {
this.sendQ = []
this.loop = setInterval(() => {
if (this.sendQ.length) {
const batch = new Framer()
batch.addEncodedPackets(this.sendQ)
this.sendQ = []
this.sendIds = []
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
} else {
this.sendDecryptedBatch(batch)
}
}
}, 20)
this.loop = setInterval(this.onTick, this.options.batchingInterval || 20)
}

/**
* Sends a MCPE packet buffer
*/
sendBuffer (buffer, immediate = false) {
if (immediate) {
const batch = new Framer()
const batch = new Framer(this.compressionLevel)
batch.addEncodedPacket(buffer)
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
Expand All @@ -121,7 +125,7 @@ class Connection extends EventEmitter {

sendDecryptedBatch (batch) {
// send to raknet
batch.encode(buf => this.sendMCPE(buf, true))
this.sendMCPE(batch.encode(), true)
}

sendEncryptedBatch (batch) {
Expand Down Expand Up @@ -158,11 +162,10 @@ class Connection extends EventEmitter {
if (this.encryptionEnabled) {
this.decrypt(buffer.slice(1))
} else {
Framer.decode(buffer, packets => {
for (const packet of packets) {
this.readPacket(packet)
}
})
const packets = Framer.decode(buffer)
for (const packet of packets) {
this.readPacket(packet)
}
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/createClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function createClient (options) {
const ad = advertisement.fromServerName(data)
client.options.version = options.version ?? (Versions[ad.version] ? ad.version : CURRENT_VERSION)
if (client.conLog) client.conLog(`Connecting to server ${ad.motd} (${ad.name}), version ${ad.version}`, client.options.version !== ad.version ? ` (as ${client.options.version})` : '')
client.emit('connect_allowed')
connect(client)
}, client)
}
Expand Down Expand Up @@ -46,15 +47,17 @@ function connect (client) {
sleep(500).then(() => client.queue('request_chunk_radius', { chunk_radius: client.viewDistance || 10 }))
})

const KEEPALIVE_INTERVAL = 10 // Send tick sync packets every 10 ticks
// Send tick sync packets every 10 ticks
const keepAliveInterval = 10
const keepAliveIntervalBig = BigInt(keepAliveInterval)
let keepalive
client.tick = 0n
client.once('spawn', () => {
keepalive = setInterval(() => {
// Client fills out the request_time and the server does response_time in its reply.
client.queue('tick_sync', { request_time: client.tick, response_time: 0n })
client.tick += BigInt(KEEPALIVE_INTERVAL)
}, 50 * KEEPALIVE_INTERVAL)
client.tick += keepAliveIntervalBig
}, 50 * keepAliveInterval)

client.on('tick_sync', async packet => {
client.emit('heartbeat', packet.response_time)
Expand Down
1 change: 1 addition & 0 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Server extends EventEmitter {
if (this.options.protocolVersion < Options.MIN_VERSION) {
throw new Error(`Protocol version < ${Options.MIN_VERSION} : ${this.options.protocolVersion}, too old`)
}
this.compressionLevel = this.options.compressionLevel || 7
}

onOpenConnection = (conn) => {
Expand Down
1 change: 1 addition & 0 deletions src/serverPlayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Player extends Connection {
this.deserializer = server.deserializer
this.connection = connection
this.options = server.options
this.compressionLevel = server.compressionLevel

KeyExchange(this, server, server.options)
Login(this, server, server.options)
Expand Down
16 changes: 6 additions & 10 deletions src/transforms/encryption.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ function createEncryptor (client, iv) {
// The send counter is represented as a little-endian 64-bit long and incremented after each packet.

function process (chunk) {
Zlib.deflateRaw(chunk, { level: 7 }, (err, buffer) => {
if (err) throw err
const packet = Buffer.concat([buffer, computeCheckSum(buffer, client.sendCounter, client.secretKeyBytes)])
client.sendCounter++
client.cipher.write(packet)
})
const buffer = Zlib.deflateRawSync(chunk, { level: client.compressionLevel })
const packet = Buffer.concat([buffer, computeCheckSum(buffer, client.sendCounter, client.secretKeyBytes)])
client.sendCounter++
client.cipher.write(packet)
}

client.cipher.on('data', client.onEncryptedPacket)
Expand Down Expand Up @@ -72,10 +70,8 @@ function createDecryptor (client, iv) {
return
}

Zlib.inflateRaw(chunk, { chunkSize: 1024 * 1024 * 2 }, (err, buffer) => {
if (err) throw err
client.onDecryptedPacket(buffer)
})
const buffer = Zlib.inflateRawSync(chunk, { chunkSize: 512000 })
client.onDecryptedPacket(buffer)
}

client.decipher.on('data', verify)
Expand Down
30 changes: 13 additions & 17 deletions src/transforms/framer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,30 @@ const zlib = require('zlib')

// Concatenates packets into one batch packet, and adds length prefixs.
class Framer {
constructor () {
constructor (compressionLevel) {
// Encoding
this.packets = []
this.compressionLevel = 7
this.compressionLevel = compressionLevel
}

static decode (buf, cb) {
static decode (buf) {
// Read header
if (buf[0] !== 0xfe) throw Error('bad batch packet header ' + buf[0])
const buffer = buf.slice(1)

// Decode the payload
zlib.inflateRaw(buffer, { chunkSize: 1024 * 1024 * 2 }, (err, inflated) => {
if (err) { // Try to decode without compression
Framer.getPackets(buffer)
return
}
cb(Framer.getPackets(inflated))
})
// Decode the payload with 512kb buffer
try {
const inflated = zlib.inflateRawSync(buffer, { chunkSize: 512000 })
return Framer.getPackets(inflated)
} catch (e) { // Try to decode without compression
return Framer.getPackets(buffer)
}
}

encode (cb) {
encode () {
const buf = Buffer.concat(this.packets)
zlib.deflateRaw(buf, { level: this.compressionLevel }, (err, def) => {
if (err) throw err
const ret = Buffer.concat([Buffer.from([0xfe]), def])
cb(ret)
})
const def = zlib.deflateRawSync(buf, { level: this.compressionLevel })
return Buffer.concat([Buffer.from([0xfe]), def])
}

addEncodedPacket (chunk) {
Expand Down