Skip to content

Commit

Permalink
feat: Improve to be able to send messages back and forth
Browse files Browse the repository at this point in the history
- Doesn't work because Deno doesn't support MessageChannel yet (denoland/deno#6691) - which is something Comlink wants.
  • Loading branch information
KallynGowdy committed Jul 15, 2020
1 parent bc0db08 commit 312c1db
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 44 deletions.
9 changes: 7 additions & 2 deletions src/aux-vm-deno/typings/deno.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@ declare var Deno: {

interface Reader {
read(p: Uint8Array): Promise<number | null>;
readSync(p: Uint8Array): number | null;
}

interface Writer {
write(p: Uint8Array): Promise<number>;
writeSync(p: Uint8Array): number;
}

interface Connection extends Reader, Writer {}

declare class DenoBuffer {
declare class DenoBuffer implements Reader, Writer {
write(p: Uint8Array): Promise<number>;
writeSync(p: Uint8Array): number;
read(p: Uint8Array): Promise<number>;
readSync(p: Uint8Array): number;
readonly length: number;
readonly capacity: number;
bytes(options?: { copy: boolean }): Uint8Array;
truncate(num: number): void;
grow(num: number): void;
empty(): boolean;
readSync(p: Uint8Array): number | null;
}
84 changes: 54 additions & 30 deletions src/aux-vm-deno/vm/DenoEntry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ import { MessageChannelImpl } from './MessageChannel';

const port = parseInt(Deno.args[0]);

console.log('[DenoEntry] Listening on port', port);

init();

async function init() {
const channel = await tcpMessageChannel();

console.log('[DenoEntry] Listening for messages...');
expose(DenoAuxChannel, <any>channel.port2);

channel.port2.postMessage({
type: 'init',
});
}

async function tcpMessageChannel() {
Expand All @@ -24,6 +30,7 @@ async function tcpMessageChannel() {

// @ts-ignore
channel.port1.addEventListener('message', e => {
console.log('[DenoEntry] Sending Message');
const json = JSON.stringify(e.data);

// Messages to stdout all follow the same format:
Expand All @@ -32,50 +39,67 @@ async function tcpMessageChannel() {
// - According to MDN UTF-8 never has more than string.length * 3 bytes (https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder/encodeInto)
// - Using a 32-bit number means we can't have messages larger than ~4GiB
const byteBuffer = new Uint8Array(4 + json.length * 3);
const view = new DataView(byteBuffer.buffer);
const view = new DataView(byteBuffer.buffer, byteBuffer.byteOffset);

// Encode the JSON as UTF-8
// Skip the first 4 bytes
const result = encoder.encodeInto(json, byteBuffer.subarray(4));
view.setUint32(0, result.written);
view.setUint32(0, result.written, true);

conn.write(byteBuffer.subarray(0, result.written));
console.log(`[DenoEntry] Writing ${result.written} bytes`);
conn.write(byteBuffer.subarray(0, result.written + 4));
});

readMessages();

return channel;

async function readMessages() {
const iter = Deno.iter(conn, {
bufSize: 512 * 512,
});
let messageBuffer = new Deno.Buffer();
let messageSize = -1;
for await (const chunk of iter) {
messageBuffer.readSync(chunk);
if (messageSize < 0 && messageBuffer.length >= 4) {
const view = new DataView(
messageBuffer.bytes({ copy: false }),
0,
4
try {
const iter = Deno.iter(conn, {
bufSize: 512 * 512,
});
let messageBuffer = new Deno.Buffer();
let messageSize = -1;
console.log('[DenoEntry] Reading messages...');
for await (const chunk of iter) {
console.log(
'[DenoEntry] Got Data',
chunk.byteLength,
chunk.length
);
messageSize = view.getUint32(0);
messageBuffer.truncate(4);
messageBuffer.grow(
Math.max(0, messageSize - messageBuffer.capacity)
);
}
if (messageSize >= 0 && messageBuffer.length >= messageSize) {
const messageBytes = messageBuffer
.bytes({ copy: false })
.subarray(0, messageSize);
messageBuffer.truncate(messageSize);
messageSize = -1;
const json = decoder.decode(messageBytes);
const message = JSON.parse(json);
channel.port1.postMessage(message);
messageBuffer.writeSync(chunk);
console.log('[DenoEntry] Read data', messageBuffer.length);
if (messageSize < 0 && messageBuffer.length >= 4) {
const bytes = new Uint8Array(4);
messageBuffer.readSync(bytes);
const view = new DataView(
bytes.buffer,
bytes.byteOffset,
4
);
messageSize = view.getUint32(0, true);
console.log('[DenoEntry] Got Length', messageSize);
messageBuffer.grow(
Math.max(0, messageSize - messageBuffer.capacity)
);
console.log(
'[DenoEntry] Buffer length',
messageBuffer.length
);
}
if (messageSize >= 0 && messageBuffer.length >= messageSize) {
const messageBytes = new Uint8Array(messageSize);
messageBuffer.readSync(messageBytes);
messageSize = -1;
const json = decoder.decode(messageBytes);
const message = JSON.parse(json);
console.log('[DenoEntry] Got Message');
channel.port1.postMessage(message);
}
}
} catch (err) {
console.error('[DenoEntry]', err);
}
}
}
61 changes: 49 additions & 12 deletions src/aux-vm-deno/vm/DenoVM.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
import childProcess, { ChildProcess } from 'child_process';
import { Server, AddressInfo } from 'net';
import { MessageChannel } from 'worker_threads';
import { MessageChannelImpl } from './MessageChannel';
import { MessageChannelImpl, MessageEvent } from './MessageChannel';

/**
* Defines an interface for an AUX that is run inside a virtual machine.
Expand Down Expand Up @@ -101,44 +101,81 @@ export class DenoVM implements AuxVM {
this._server = new Server(conn => {
this._channel.port2.addEventListener('message', e => {
try {
console.log('[DenoVM] Sending message');
const json = JSON.stringify(e.data);
// Messages to stdout all follow the same format:
// 4 bytes (32-bit number) for the length of the message
// N bytes for the message JSON as UTF-8
// - According to MDN UTF-8 never has more than string.length * 3 bytes (https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder/encodeInto)
// - Using a 32-bit number means we can't have messages larger than ~4GiB
const byteBuffer = new Uint8Array(4 + json.length * 3);
const view = new DataView(byteBuffer.buffer);
const view = new DataView(
byteBuffer.buffer,
byteBuffer.byteOffset
);

// Encode the JSON as UTF-8
// Skip the first 4 bytes
const result = encoder.encodeInto(
json,
byteBuffer.subarray(4)
);
view.setUint32(0, result.written);
conn.write(byteBuffer.subarray(0, result.written));
view.setUint32(0, result.written, true);
console.log(
'[DenoVM] Writing ' + result.written + ' bytes'
);
conn.write(byteBuffer.subarray(0, result.written + 4));
} catch (err) {
console.error('[DenoVM]', err);
}
});

let messageBuffer = Buffer.alloc(0);
let messageSize = -1;
conn.on('data', (data: Buffer) => {
try {
console.log('[DenoVM] Got data');
// TODO: Fix to properly handle different buffer sizes
const uint32 = new Uint32Array(data);
const numBytes = uint32[0];
const messageBytes = data.subarray(4, numBytes + 4);
const json = decoder.decode(messageBytes);
const message = JSON.parse(json);
this._channel.port2.postMessage(message);
messageBuffer = Buffer.concat([messageBuffer, data]);
if (messageSize < 0 && messageBuffer.byteLength >= 4) {
const view = new DataView(
messageBuffer.buffer,
messageBuffer.byteOffset,
4
);
messageSize = view.getUint32(0, true);
messageBuffer = messageBuffer.slice(4);
}
if (
messageSize >= 0 &&
messageBuffer.byteLength >= messageSize
) {
// TODO: Fix to properly handle different buffer sizes
const view = messageBuffer.subarray(0, messageSize);
const json = decoder.decode(view);
const message = JSON.parse(json);
let buf = messageBuffer;
messageBuffer = Buffer.alloc(
buf.byteLength - view.byteLength
);
buf.copy(messageBuffer);
messageSize = -1;
this._channel.port2.postMessage(message);
}
} catch (err) {
console.error('[DenoVM]', err);
}
});

resolveConnection();
const listener = (e: MessageEvent) => {
if (e && e.data && e.data.type === 'init') {
this._channel.port1.removeEventListener(
'message',
listener
);
resolveConnection();
}
};
this._channel.port1.addEventListener('message', listener);
});

this._server.listen(() => {
Expand Down

0 comments on commit 312c1db

Please sign in to comment.