Issue
How to convert a dynamic Set<Promise<T>>
into AsyncIterable<T>
(unordered)?
The resulting iterable must produce values as they get resolved, and it must end just as the source runs empty.
I have a dynamic cache of promises to be resolved, and values reported, disregarding the order.
NOTE: The source is dynamic, which means it can receive new Promise<T>
elements while we progress through the resulting iterator.
UPDATE
After going through all the suggestions, I was able to implement my operator. And here're the official docs.
I'm adding a bounty to reward anyone who can improve it further, though at this point a PR is preferable (it is for a public library), or at least something that fits the same protocol.
Solution
Judging from your library implementation, you actually want to transform an AsyncIterable<Promise<T>>
into an AsyncIterator<T>
by racing up to N of the produced promises concurrently. I would implement that as follows:
async function* limitConcurrent<T>(iterable: AsyncIterable<Promise<T>>, n: number): AsyncIterator<T> {
const pool = new Set();
for await (const p of iterable) {
const promise = Promise.resolve(p).finally(() => {
pool.delete(promise); // FIXME see below
});
promise.catch(() => { /* ignore */ }); // mark rejections as handled
pool.add(promise);
if (pool.size >= n) {
yield /* await */ Promise.race(pool);
}
}
while (pool.size) {
yield /* await */ Promise.race(pool);
}
}
Notice that if one of the promises in the pool rejects, the returned iterator will end with the error and the results of the other promises that are currently in the pool will be ignored.
However, above implementation presumes that the iterable
is relatively fast, as it will need to produce n
promises before the pool is raced for the first time. If it yields the promises slower than the promises take to resolve, the results are held up unnecessarily.
And worse, the above implementation may loose values. If the returned iterator is not consumed fast enough, or the iterable
is not yielding fast enough, multiple promise handlers may delete their respective promise
from the pool
during one iteration of the loop, and the Promise.race
will consider only one of them.
So this would work for a synchronous iterable, but if you actually have an asynchronous iterable, you would need a different solution. Essentially you got a consumer and a producer that are more or less independent, and what you need is some queue between them.
Yet with a single queue it still wouldn't handle backpressure, the producer just runs as fast as it can (given the iteration of promises and the concurrency limit) while filling the queue. What you really need then is a channel that allows synchronisation in both directions, e.g. using two queues:
class AsyncQueue<T> {
resolvers: null | ((res: IteratorResult<T> | Promise<never>) => void)[];
promises: Promise<IteratorResult<T>>[];
constructor() {
// invariant: at least one of the arrays is empty.
// when `resolvers` is `null`, the queue has ended.
this.resolvers = [];
this.promises = [];
}
putNext(result: IteratorResult<T> | Promise<never>): void {
if (!this.resolvers) throw new Error('Queue already ended');
if (this.resolvers.length) this.resolvers.shift()(result);
else this.promises.push(Promise.resolve(result));
}
put(value: T): void {
this.putNext({done: false, value});
}
end(): void {
for (const res of this.resolvers) res({done: true, value: undefined});
this.resolvers = null;
}
next(): Promise<IteratorResult<T>> {
if (this.promises.length) return this.promises.shift();
else if (this.resolvers) return new Promise(resolve => { this.resolvers.push(resolve); });
else return Promise.resolve({done: true, value: undefined});
}
[Symbol.asyncIterator](): AsyncIterator<T> {
// Todo: Use AsyncIterator.from()
return this;
}
}
function limitConcurrent<T>(iterable: AsyncIterable<Promise<T>>, n: number): AsyncIterator<T> {
const produced = new AsyncQueue<T>();
const consumed = new AsyncQueue<void>();
(async () => {
try {
let count = 0;
for await (const p of iterable) {
const promise = Promise.resolve(p);
promise.then(value => {
produced.put(value);
}, _err => {
produced.putNext(promise); // with rejection already marked as handled
});
if (++count >= n) {
await consumed.next(); // happens after any produced.put[Next]()
count--;
}
}
while (count) {
await consumed.next(); // happens after any produced.put[Next]()
count--;
}
} catch(e) {
// ignore `iterable` errors?
} finally {
produced.end();
}
})();
return (async function*() {
for await (const value of produced) {
yield value;
consumed.put();
}
}());
}
Answered By - Bergi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.