diff --git a/common/src/async-queue.ts b/common/src/async-queue.ts index 9b8e740fa..2646ade3d 100644 --- a/common/src/async-queue.ts +++ b/common/src/async-queue.ts @@ -62,19 +62,36 @@ export function createAsyncQueue() { return true; } + function end(e?: Error) { + if (ended) + return false; + // catch to prevent unhandled rejection. + ended = e || new EndError() + clear(e); + return true; + } + function queue() { return (async function* () { - while (true) { - try { - const item = await dequeue(); - yield item; - } - catch (e) { - if (e instanceof EndError) - return; - throw e; + try { + while (true) { + try { + const item = await dequeue(); + yield item; + } + catch (e) { + // the yield above may raise an error, and the queue should be ended. + end(e); + if (e instanceof EndError) + return; + throw e; + } } } + finally { + // the yield above may cause an iterator return, and the queue should be ended. + end(); + } })(); } @@ -106,14 +123,7 @@ export function createAsyncQueue() { submit(item: T, signal?: AbortSignal) { return submit(item, undefined, signal); }, - end(e?: Error) { - if (ended) - return false; - // catch to prevent unhandled rejection. - ended = e || new EndError() - clear(e); - return true; - }, + end, async enqueue(item: T, signal?: AbortSignal) { const dequeued = new Deferred(); if (!submit(item, dequeued, signal))