Skip to content

Commit

Permalink
generator and async generator support
Browse files Browse the repository at this point in the history
fixes developit#38
First thing to note about this is that I broke the microbundle build.
I'll attempt to fix the generator function to not use things microbundle
doesn't expect, but I'm not sure how far I'll get
with that. Which that's also something tying up this pull request:
developit/greenlet#50

One other thing to note about this is that I decided on
returning a promise of the asyncIterator from the generator function.
This is due to the fact that the function from the user only ever gets
evaluated as an executable function on the worker side.
  • Loading branch information
johnsonjo4531 committed Dec 30, 2019
1 parent 540ec5d commit 2e95131
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,30 @@ export default function workerize(code, options) {
URL.revokeObjectURL(url);
term.call(worker);
};
worker.call = (method, params) => new Promise( (resolve, reject) => {
worker.call = (method, params, genStatus=0, genId=undefined) => new Promise( (resolve, reject) => {
let id = `rpc${++counter}`;
callbacks[id] = [resolve, reject];
worker.postMessage({ type: 'RPC', id, method, params });
});
worker.postMessage({ type: 'RPC', id, genId, method, genStatus, params });
}).then(d => !d.hasOwnProperty("genId") ? d : (async function* workerAsyncGenPassthrough () {
const genId = d.genId;
try {
let result = {done: false};
let value;
while (!result.done) {
result = await worker.call(method, [value], 0, genId);
if (result.done) { break; }
value = yield result.value;
}
return result.value;
}
catch (err) {
await worker.call(method, ['' + err], 2, genId);
throw err;
}
finally {
await worker.call(method, [undefined], 1, genId);
}
})());
worker.rpcMethods = {};
setup(worker, worker.rpcMethods, callbacks);
worker.expose = methodName => {
Expand All @@ -51,10 +70,13 @@ export default function workerize(code, options) {
for (i in exports) if (!(i in worker)) worker.expose(i);
return worker;
}

function setup(ctx, rpcMethods, callbacks) {
let gencounter = 0;
let GENS = {};
ctx.addEventListener('message', ({ data }) => {
let id = data.id;
let genId = data.genId;
let genStatus = data.genStatus;
if (data.type!=='RPC' || id==null) return;
if (data.method) {
let method = rpcMethods[data.method];
Expand All @@ -63,15 +85,31 @@ function setup(ctx, rpcMethods, callbacks) {
}
else {
Promise.resolve()
.then( () => method.apply(null, data.params) )
.then( result => { ctx.postMessage({ type: 'RPC', id, result }); })
// Either use a generator or call a method.
.then( () => !GENS[genId] ? method.apply(null, data.params) : GENS[genId][genStatus](data.params[0]))
.then( result => {
if (method.constructor.name === 'AsyncGeneratorFunction' || method.constructor.name === 'GeneratorFunction') {
if (!GENS[genId]) {
GENS[++gencounter] = [result.next.bind(result), result.return.bind(result), result.throw.bind(result)];
// return an initial message of success.
// genId should only be sent to the main thread when initializing the generator
return ctx.postMessage({ type: 'RPC', id, genId: gencounter, result: { value: undefined, done: false } });
}
}
ctx.postMessage({ type: 'RPC', id, result });
if (result.done) {
GENS[genId] = null;
}
})
.catch( err => { ctx.postMessage({ type: 'RPC', id, error: ''+err }); });
}
}
else {
let callback = callbacks[id];
if (callback==null) throw Error(`Unknown callback ${id}`);
delete callbacks[id];
// genId should only be sent to the main thread when initializing the generator
if(data.genId) { data.result.genId = data.genId; }
if (data.error) callback[1](Error(data.error));
else callback[0](data.result);
}
Expand Down

0 comments on commit 2e95131

Please sign in to comment.