diff --git a/docs/API.md b/docs/API.md index d6e23ed1..6401d3b1 100644 --- a/docs/API.md +++ b/docs/API.md @@ -20,8 +20,8 @@ Returns a `Client` instance and connects to the server. | skipPing | *optional* | Whether pinging the server to check its version should be skipped. | | conLog | *optional* | Where to log connection information (server join, kick messages to). Defaults to console.log, set to `null` to not log anywhere. | | useNativeRaknet | *optional* | Whether to use the C++ version of RakNet. Set to false to use JS. | -| authTitle | *optional* | The client ID to sign in as, defaults to Minecraft for Nintendo Switch. Set false to sign in through Azure. See prismarine-auth | -| deviceType | *optional* | The device type to sign in as, defaults to "Nintendo". See prismarine-auth | +| compressionLevel | *optional* | What zlib compression level to use, default to **7** | +| batchingInterval | *optional* | How frequently, in milliseconds to flush and write the packet queue (default: 20ms) | The following events are emitted by the client: * 'status' - When the client's login sequence status has changed @@ -30,6 +30,8 @@ The following events are emitted by the client: * 'kick' - The server has kicked the client * 'close' - The server has closed the connection * 'error' - An recoverable exception has happened. Not catching will throw an exception +* 'connect_allowed' - Emitted after the client has pinged the server and gets version information. +* 'heartbeat' - Emitted after two successful tick_sync (keepalive) packets have been sent bidirectionally ## be.createServer(options) : Server diff --git a/index.d.ts b/index.d.ts index a14500df..6e9964f9 100644 --- a/index.d.ts +++ b/index.d.ts @@ -21,6 +21,10 @@ declare module "bedrock-protocol" { useNativeRaknet?: boolean, // If using JS implementation of RakNet, should we use workers? (This only affects the client) useRaknetWorker?: boolean + // Compression level for zlib, default to 7 + compressionLevel?: number + // How frequently the packet queue should be flushed in milliseconds, defaults to 20ms + batchingInterval?: number } export interface ClientOptions extends Options { diff --git a/src/client.js b/src/client.js index 76e4bcb6..f4034638 100644 --- a/src/client.js +++ b/src/client.js @@ -68,6 +68,7 @@ class Client extends Connection { if (this.options.protocolVersion < Options.MIN_VERSION) { throw new Error(`Protocol version < ${Options.MIN_VERSION} : ${this.options.protocolVersion}, too old`) } + this.compressionLevel = this.options.compressionLevel || 7 } get entityId () { @@ -140,7 +141,12 @@ class Client extends Connection { if (this.status === ClientStatus.Initializing && this.options.autoInitPlayer === true) { if (statusPacket.status === 'player_spawn') { this.status = ClientStatus.Initialized - this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId }) + if (!this.entityId) { + // We need to wait for start_game in the rare event we get a player_spawn before start_game race condition + this.on('start_game', () => this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId })) + } else { + this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId }) + } this.emit('spawn') } } diff --git a/src/connection.js b/src/connection.js index ce15507e..ddd60b1e 100644 --- a/src/connection.js +++ b/src/connection.js @@ -60,7 +60,7 @@ class Connection extends EventEmitter { write (name, params) { this.outLog?.(name, params) if (name === 'start_game') this.updateItemPalette(params.itemstates) - const batch = new Framer() + const batch = new Framer(this.compressionLevel) const packet = this.serializer.createPacketBuffer({ name, params }) batch.addEncodedPacket(packet) @@ -84,21 +84,25 @@ class Connection extends EventEmitter { this.sendIds.push(name) } + _tick () { + if (this.sendQ.length) { + const batch = new Framer(this.compressionLevel) + batch.addEncodedPackets(this.sendQ) + this.sendQ = [] + this.sendIds = [] + if (this.encryptionEnabled) { + this.sendEncryptedBatch(batch) + } else { + this.sendDecryptedBatch(batch) + } + } + } + + onTick = this._tick.bind(this) + startQueue () { this.sendQ = [] - this.loop = setInterval(() => { - if (this.sendQ.length) { - const batch = new Framer() - batch.addEncodedPackets(this.sendQ) - this.sendQ = [] - this.sendIds = [] - if (this.encryptionEnabled) { - this.sendEncryptedBatch(batch) - } else { - this.sendDecryptedBatch(batch) - } - } - }, 20) + this.loop = setInterval(this.onTick, this.options.batchingInterval || 20) } /** @@ -106,7 +110,7 @@ class Connection extends EventEmitter { */ sendBuffer (buffer, immediate = false) { if (immediate) { - const batch = new Framer() + const batch = new Framer(this.compressionLevel) batch.addEncodedPacket(buffer) if (this.encryptionEnabled) { this.sendEncryptedBatch(batch) @@ -121,7 +125,7 @@ class Connection extends EventEmitter { sendDecryptedBatch (batch) { // send to raknet - batch.encode(buf => this.sendMCPE(buf, true)) + this.sendMCPE(batch.encode(), true) } sendEncryptedBatch (batch) { @@ -158,11 +162,10 @@ class Connection extends EventEmitter { if (this.encryptionEnabled) { this.decrypt(buffer.slice(1)) } else { - Framer.decode(buffer, packets => { - for (const packet of packets) { - this.readPacket(packet) - } - }) + const packets = Framer.decode(buffer) + for (const packet of packets) { + this.readPacket(packet) + } } } } diff --git a/src/createClient.js b/src/createClient.js index 59b6ceba..7aff0889 100644 --- a/src/createClient.js +++ b/src/createClient.js @@ -17,6 +17,7 @@ function createClient (options) { const ad = advertisement.fromServerName(data) client.options.version = options.version ?? (Versions[ad.version] ? ad.version : CURRENT_VERSION) if (client.conLog) client.conLog(`Connecting to server ${ad.motd} (${ad.name}), version ${ad.version}`, client.options.version !== ad.version ? ` (as ${client.options.version})` : '') + client.emit('connect_allowed') connect(client) }, client) } @@ -46,15 +47,17 @@ function connect (client) { sleep(500).then(() => client.queue('request_chunk_radius', { chunk_radius: client.viewDistance || 10 })) }) - const KEEPALIVE_INTERVAL = 10 // Send tick sync packets every 10 ticks + // Send tick sync packets every 10 ticks + const keepAliveInterval = 10 + const keepAliveIntervalBig = BigInt(keepAliveInterval) let keepalive client.tick = 0n client.once('spawn', () => { keepalive = setInterval(() => { // Client fills out the request_time and the server does response_time in its reply. client.queue('tick_sync', { request_time: client.tick, response_time: 0n }) - client.tick += BigInt(KEEPALIVE_INTERVAL) - }, 50 * KEEPALIVE_INTERVAL) + client.tick += keepAliveIntervalBig + }, 50 * keepAliveInterval) client.on('tick_sync', async packet => { client.emit('heartbeat', packet.response_time) diff --git a/src/server.js b/src/server.js index 3192889e..8a2d723f 100644 --- a/src/server.js +++ b/src/server.js @@ -33,6 +33,7 @@ class Server extends EventEmitter { if (this.options.protocolVersion < Options.MIN_VERSION) { throw new Error(`Protocol version < ${Options.MIN_VERSION} : ${this.options.protocolVersion}, too old`) } + this.compressionLevel = this.options.compressionLevel || 7 } onOpenConnection = (conn) => { diff --git a/src/serverPlayer.js b/src/serverPlayer.js index 967a9f86..906f3f2e 100644 --- a/src/serverPlayer.js +++ b/src/serverPlayer.js @@ -15,6 +15,7 @@ class Player extends Connection { this.deserializer = server.deserializer this.connection = connection this.options = server.options + this.compressionLevel = server.compressionLevel KeyExchange(this, server, server.options) Login(this, server, server.options) diff --git a/src/transforms/encryption.js b/src/transforms/encryption.js index 31f9ae23..b02351b5 100644 --- a/src/transforms/encryption.js +++ b/src/transforms/encryption.js @@ -36,12 +36,10 @@ function createEncryptor (client, iv) { // The send counter is represented as a little-endian 64-bit long and incremented after each packet. function process (chunk) { - Zlib.deflateRaw(chunk, { level: 7 }, (err, buffer) => { - if (err) throw err - const packet = Buffer.concat([buffer, computeCheckSum(buffer, client.sendCounter, client.secretKeyBytes)]) - client.sendCounter++ - client.cipher.write(packet) - }) + const buffer = Zlib.deflateRawSync(chunk, { level: client.compressionLevel }) + const packet = Buffer.concat([buffer, computeCheckSum(buffer, client.sendCounter, client.secretKeyBytes)]) + client.sendCounter++ + client.cipher.write(packet) } client.cipher.on('data', client.onEncryptedPacket) @@ -72,10 +70,8 @@ function createDecryptor (client, iv) { return } - Zlib.inflateRaw(chunk, { chunkSize: 1024 * 1024 * 2 }, (err, buffer) => { - if (err) throw err - client.onDecryptedPacket(buffer) - }) + const buffer = Zlib.inflateRawSync(chunk, { chunkSize: 512000 }) + client.onDecryptedPacket(buffer) } client.decipher.on('data', verify) diff --git a/src/transforms/framer.js b/src/transforms/framer.js index 77f41e93..c31b8d85 100644 --- a/src/transforms/framer.js +++ b/src/transforms/framer.js @@ -3,34 +3,30 @@ const zlib = require('zlib') // Concatenates packets into one batch packet, and adds length prefixs. class Framer { - constructor () { + constructor (compressionLevel) { // Encoding this.packets = [] - this.compressionLevel = 7 + this.compressionLevel = compressionLevel } - static decode (buf, cb) { + static decode (buf) { // Read header if (buf[0] !== 0xfe) throw Error('bad batch packet header ' + buf[0]) const buffer = buf.slice(1) - // Decode the payload - zlib.inflateRaw(buffer, { chunkSize: 1024 * 1024 * 2 }, (err, inflated) => { - if (err) { // Try to decode without compression - Framer.getPackets(buffer) - return - } - cb(Framer.getPackets(inflated)) - }) + // Decode the payload with 512kb buffer + try { + const inflated = zlib.inflateRawSync(buffer, { chunkSize: 512000 }) + return Framer.getPackets(inflated) + } catch (e) { // Try to decode without compression + return Framer.getPackets(buffer) + } } - encode (cb) { + encode () { const buf = Buffer.concat(this.packets) - zlib.deflateRaw(buf, { level: this.compressionLevel }, (err, def) => { - if (err) throw err - const ret = Buffer.concat([Buffer.from([0xfe]), def]) - cb(ret) - }) + const def = zlib.deflateRawSync(buf, { level: this.compressionLevel }) + return Buffer.concat([Buffer.from([0xfe]), def]) } addEncodedPacket (chunk) {