Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Generators and Async Generators support #50

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,43 @@ console.log(await getName('developit'))

[🔄 **Run this example on JSFiddle**](https://jsfiddle.net/developit/mf9fbma5/)

## Generator Example

Greenlet can now work with `Generators` and `AsyncGenerators` and will always return an `AsyncGenerator` in their
place. This means you can fetch small portions of data as you need it.

```js
import greenlet from '../greenlet.js';

let lazyGetRepos = greenlet(async function* (username, returnNumber = 10) {
let url = `https://api.github.com/users/${username}/repos`;
let res = await fetch(url);
let repos = await res.json();
while (repos.length > 0) {
let newReturnNumber = yield repos.splice(0, returnNumber);
if (typeof newReturnNumber !== 'undefined') {
returnNumber = newReturnNumber;
}
}
});

const repoIter = lazyGetRepos('developit', 5);
// you could call these over any amount of time...
console.log(await repoIter.next()); // {value: Array(5), done: false}
console.log(await repoIter.next()); // {value: Array(5), done: false}
console.log(await repoIter.next(10)); // {value: Array(10), done: false}
// when your done clean up the asyncIterator
console.log(await repoIter.return()); // {value: undefined, done: true}

// or use for await of syntax to iterate through all values;
const repoIter2 = lazyGetRepos('developit', 5);

for await (const repos of repoIter2) {
console.log(items);
}
// no need to clean up if you have exhausted the iterator.
```


## Transferable ready

Expand Down
119 changes: 89 additions & 30 deletions greenlet.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,59 @@
/** Move an async function into its own thread.
* @param {Function} asyncFunction An (async) function to run in a Worker.
* @param {Function} asyncFunction An (async) or async generator function to run in a Worker.
* @param {{useTransferables?: boolean}} options
* useTransferables defaults to true.
* @public
*/
export default function greenlet(asyncFunction) {
export default function greenlet(asyncFunction, options = {}) {
const defaults = {
useTransferables: true
};
const { useTransferables } = { ...defaults, ...options };
// A simple counter is used to generate worker-global unique ID's for RPC:
let currentId = 0;
let promiseIds = 0;

// A simple counter is use to generate worker-global generator ID's for RPC:
let genIds = 0;

// Outward-facing promises store their "controllers" (`[request, reject]`) here:
const promises = {};

// Use a data URI for the worker's src. It inlines the target function and an RPC handler:
const script = '$$='+asyncFunction+';onmessage='+(e => {
/* global $$ */

// Invoking within then() captures exceptions in the supplied async function as rejections
Promise.resolve(e.data[1]).then(
v => $$.apply($$, v)
const script = `$$=${asyncFunction};USET=${useTransferables};GENS={};onmessage=` + (e => {
/* global $$, GENS, USET */
const getTransferables = d => !USET ? [] : d.filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
));
const [promiseID, args, status, genID] = e.data;
Promise.resolve(args).then(
// either apply the async/generator/async generator function or use a generator function's iterator
v => !GENS[genID] ? $$.apply($$, v) : GENS[genID][status](v[0])
).then(
// success handler - callback(id, SUCCESS(0), result)
// success handler - callback(id, SUCCESS(0))
// if `d` is transferable transfer zero-copy
d => {
postMessage([e.data[0], 0, d], [d].filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
)));
if ($$.constructor.name === 'AsyncGeneratorFunction' || $$.constructor.name === 'GeneratorFunction') {
// setup the generator
if (!GENS[genID]) {
GENS[genID] = [d.next.bind(d), d.return.bind(d), d.throw.bind(d)];
// return an initial message of success.
return postMessage([promiseID, 0, { value: undefined, done: false }]);
}
// yield the value
postMessage([promiseID, 0, d], getTransferables([d.value]));
if (d.done) {
GENS[genID] = null;
}
}
else {
// here we know it's just an async function that needs it's return value.
postMessage([promiseID, 0, d], getTransferables([d]));
}
},
// error handler - callback(id, ERROR(1), error)
er => { postMessage([e.data[0], 1, '' + er]); }
er => { postMessage([promiseID, 1, '' + er]); }
);
});
const workerURL = URL.createObjectURL(new Blob([script]));
Expand All @@ -48,20 +74,53 @@ export default function greenlet(asyncFunction) {
promises[e.data[0]] = null;
};

// Return a proxy function that forwards calls to the worker & returns a promise for the result.
return function (args) {
args = [].slice.call(arguments);
return new Promise(function () {
// Add the promise controller to the registry
promises[++currentId] = arguments;
const passMessagePromise = (args, status, genID) => new Promise(function () {
// Add the promise controller to the registry
promises[++promiseIds] = arguments;

// Send an RPC call to the worker - call(id, params)
// The filter is to provide a list of transferables to send zero-copy
worker.postMessage([currentId, args], args.filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
)));
});
// Send an RPC call to the worker - call(id, params)
// The filter is to provide a list of transferables to send zero-copy
worker.postMessage([promiseIds, args, status, genID], !useTransferables ? [] : args.filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
)));
});
// if it's a generator or async generator function return a async generator function.
if (asyncFunction.constructor.name === 'AsyncGeneratorFunction' || asyncFunction.constructor.name === 'GeneratorFunction') {
return function workerPassthrough () {
const genID = ++genIds;
const init = passMessagePromise([].slice.call(arguments), 0, genID);
return {
done: false,
async next (value) {
await init;
if (this.done) { return { value: undefined, done: true }; }
const result = await passMessagePromise([value], 0, genID);
if (result.done) { return this.return(result.value); }
return result;
},
async return (value) {
await init;
await passMessagePromise([undefined], 1, genID);
this.done = true;
return { value, done: true };
},
async throw (err) {
await init;
await passMessagePromise(['' + err], 2, genID);
throw err;
},
[Symbol.asyncIterator] () {
return this;
}
};
};
}

// Return a proxy function that forwards calls to the worker & returns a promise for the result.
return function () {
const args = [].slice.call(arguments);
return passMessagePromise(args);
};
}
189 changes: 186 additions & 3 deletions greenlet.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import greenlet from 'greenlet';
import greenlet from './greenlet';

describe('greenlet', () => {
it('should return an async function', () => {
Expand All @@ -7,6 +7,14 @@ describe('greenlet', () => {
expect(g()).toEqual(jasmine.any(Promise));
});

it('should return an async generator function', () => {
let g = greenlet(function* () {
yield 'one';
});
// expect that it has an iterator
expect(!!g()[Symbol.asyncIterator]).toEqual(true);
});

it('should invoke sync functions', async () => {
let foo = greenlet( a => 'foo: '+a );

Expand All @@ -28,11 +36,186 @@ describe('greenlet', () => {
});

it('should invoke async functions', async () => {
let bar = greenlet( a => new Promise( resolve => {
resolve('bar: '+a);
let bar = greenlet(a => new Promise(resolve => {
resolve('bar: ' + a);
}));

let ret = await bar('test');
expect(ret).toEqual('bar: test');
});

it('should take values from next', async () => {
let g = greenlet(function* () {
const num2 = yield 1;
yield 2 + num2;
});

const it = g();
expect((await it.next()).value).toEqual(1);
expect((await it.next(2)).value).toEqual(4);
});

it('should return both done as true and the value', async () => {
// eslint-disable-next-line require-yield
function* f (num1) {
return num1;
}
let g = greenlet(f);

const it = g(3);
const it2 = f(3);
const { done, value } = (await it.next());
const { done: done2, value: value2 } = (await it2.next());

expect(value).toEqual(value2);
expect(done).toEqual(done2);
});

it('should only iterate yielded values with for await of', async () => {
let g = greenlet(function* () {
yield 3;
yield 1;
yield 4;
return 1;
});

const arr = [];
for await (const item of g()) {
arr.push(item);
}

expect(arr[0]).toEqual(3);
expect(arr[1]).toEqual(1);
expect(arr[2]).toEqual(4);
expect(arr[3]).toEqual(undefined);
});

it('should return early with return method of async iterator', async () => {
let g = greenlet(function* () {
yield 1;
yield 2;
yield 3;
return 4;
});


const it = g();
expect([
await it.next(),
await it.next(),
await it.return(7),
await it.next(),
await it.next()
]).toEqual([
{ value: 1, done: false },
{ value: 2, done: false },
{ value: 7, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true }
]);
});

it('should throw early with return method of async iterator', async () => {
let g = greenlet(function* () {
yield 1;
yield 2;
yield 3;
return 4;
});


const it = g();
// expect this to reject!
await (async () => ([
await it.next(),
await it.return(),
await it.throw('foo'),
await it.next(),
await it.next()
]))().then(() => {
throw new Error('Promise should not have resolved');
}, () => { /** since it should error, we recover and ignore the error */});
});

it('should act like an equivalent async iterator', async () => {
async function* noG () {
const num2 = yield 1;
yield 2 + num2;
yield 3;
return 4;
}

let g = greenlet(noG);


const it = g();
const it2 = noG();
expect([
await it.next(),
await it.next(2),
await it.next(),
await it.next(),
await it.next()
]).toEqual([
await it2.next(),
await it2.next(2),
await it2.next(),
await it2.next(),
await it2.next()
]);
});

it('should throw like an equivalent async iterator', async () => {
async function* noG () {
const num2 = yield 1;
yield 2 + num2;
yield 3;
return 4;
}

let g = greenlet(noG);


const it = g();
const it2 = noG();
expect([
await it.next(),
await it.next(2),
await it.throw().catch(e => 2),
await it.return(),
await it.throw().catch(e => 3)
]).toEqual([
await it2.next(),
await it2.next(2),
await it2.throw().catch(e => 2),
await it2.return(),
await it2.throw().catch(e => 3)
]);
});

it('should return like an equivalent async iterator', async () => {
async function* noG () {
const num2 = yield 1;
yield 2 + num2;
yield 3;
return 4;
}

let g = greenlet(noG);


const it = g();
const it2 = noG();
expect([
await it.next(),
await it.next(2),
await it.return(),
await it.return()
]).toEqual([
await it2.next(),
await it2.next(2),
await it2.return(),
await it2.return()
]);
});
});
Loading