diff --git a/index.d.ts b/index.d.ts index 075093a..10dbf17 100644 --- a/index.d.ts +++ b/index.d.ts @@ -53,6 +53,39 @@ export type IterableOptions = BaseOptions & { Default: `options.concurrency` */ readonly backpressure?: number; + + /** + Whether the output iterable should produce the results of the `mapper` on elements of the `input` iterable in the same order as the elements were produced. + + If `false`, `mapper` results will be produced in the order they are available, which may not match the order the `mapper` inputs were produced by the `input` iterable, but may improve throughput. + + @example + ``` + import {pMapIterable} from 'p-map'; + import delay from 'delay'; + + const orderPreservingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; + }, {concurrency: 2, preserveOrder: true}); + // t=0ms + await orderPreservingIterator.next(); // 1 produced at t=100ms + await orderPreservingIterator.next(); // 2 produced at t=100ms + await orderPreservingIterator.next(); // 3 produced at t=105ms + + const throughputOptimizingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; + }, {concurrency: 2, preserveOrder: false}); + // t=0ms + await throughputOptimizingIterator.next(); // 2 produced at t=10ms + await throughputOptimizingIterator.next(); // 3 produced at t=15ms + await throughputOptimizingIterator.next(); // 1 produced at t=100ms + ``` + + @default `true` + */ + readonly preserveOrder?: boolean; }; type MaybePromise = T | Promise; diff --git a/index.js b/index.js index 2f7d91c..38fb136 100644 --- a/index.js +++ b/index.js @@ -168,6 +168,7 @@ export function pMapIterable( { concurrency = Number.POSITIVE_INFINITY, backpressure = concurrency, + preserveOrder = true, } = {}, ) { if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { @@ -186,84 +187,177 @@ export function pMapIterable( throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`); } + if (typeof preserveOrder !== 'boolean') { + throw new TypeError(`Expected \`preserveOrder\` to be a boolean, got \`${preserveOrder}\` (${typeof preserveOrder})`); + } + return { async * [Symbol.asyncIterator]() { const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); const promises = []; + const promisesIndexFromInputIndex = {}; + const inputIndexFromPromisesIndex = []; let runningMappersCount = 0; let isDone = false; - let index = 0; + let inputIndex = 0; + let outputIndex = 0; // Only used when `preserveOrder: false` + + const nextPromise = preserveOrder + // Treat `promises` as a queue + ? () => { + // May be undefined bc of `pMapSkip`s + while (promisesIndexFromInputIndex[outputIndex] === undefined) { + outputIndex += 1; + } - function trySpawn() { - if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) { - return; + return promises[promisesIndexFromInputIndex[outputIndex++]]; } + // Treat `promises` as a pool (order doesn't matter) + : () => Promise.race(promises); + + function popPromise(inputIndex) { + // Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array + const tail = promises.pop(); + const tailInputIndex = inputIndexFromPromisesIndex.pop(); + const promisesIndex = promisesIndexFromInputIndex[inputIndex]; + delete promisesIndexFromInputIndex[inputIndex]; + + if (promisesIndex !== promises.length) { + promises[promisesIndex] = tail; + inputIndexFromPromisesIndex[promisesIndex] = tailInputIndex; + promisesIndexFromInputIndex[tailInputIndex] = promisesIndex; + } + } - const promise = (async () => { - const {done, value} = await iterator.next(); + async function mapNext(promisesIndex) { + let next = iterator.next(); + + const myInputIndex = inputIndex++; // Save this promise's index before `trySpawn`ing others + runningMappersCount++; + promisesIndexFromInputIndex[myInputIndex] = promisesIndex; + inputIndexFromPromisesIndex[promisesIndex] = myInputIndex; + + if (isPromiseLike(next)) { + // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), + // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, + // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. + // This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via + // `options.concurrency` and `options.backpressure`. + // This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing + // (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`. + // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common + // `async` operations like disk reads, network requests, etc. + // Overall, this can reduce the total time taken to process all elements. + if (backpressure !== Number.POSITIVE_INFINITY) { + // Spawn if still below concurrency and backpressure limit + trySpawn(); + } - if (done) { - return {done: true}; + try { + next = await next; + } catch (error) { + isDone = true; + return {result: {error}, inputIndex: myInputIndex}; } + } - runningMappersCount++; + let {done, value} = next; - // Spawn if still below concurrency and backpressure limit - trySpawn(); + if (done) { + isDone = true; + return {result: {done: true}, inputIndex: myInputIndex}; + } - try { - const returnValue = await mapper(await value, index++); + // Spawn if still below concurrency and backpressure limit + trySpawn(); - runningMappersCount--; + let returnValue; + try { + if (isPromiseLike(value)) { + value = await value; + } - if (returnValue === pMapSkip) { - const index = promises.indexOf(promise); + returnValue = mapper(value, myInputIndex); + if (isPromiseLike(returnValue)) { + returnValue = await returnValue; + } + } catch (error) { + isDone = true; + return {result: {error}, inputIndex: myInputIndex}; + } - if (index > 0) { - promises.splice(index, 1); - } - } + runningMappersCount--; + + if (returnValue === pMapSkip) { + // If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed + // NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop, + // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and + // this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of + // `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously, + // before any `await`s. + if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) { + popPromise(myInputIndex); + return promises[promisesIndexFromInputIndex[myInputIndex + 1]]; + } - // Spawn if still below backpressure limit and just dropped below concurrency limit - trySpawn(); + // Otherwise, start mapping the next input element + delete promisesIndexFromInputIndex[myInputIndex]; + // Not necessary to `delete inputIndexFromPromisesIndex[promisesIndex]` since `inputIndexFromPromisesIndex[promisesIndex]` is only used + // when this promise resolves, but by that point this recursive `mapNext(promisesIndex)` call will have necessarily overwritten it. + return mapNext(promisesIndex); + } - return {done: false, value: returnValue}; - } catch (error) { - isDone = true; - return {error}; - } - })(); + // Spawn if still below backpressure limit and just dropped below concurrency limit + trySpawn(); + + return {result: {value: returnValue}, inputIndex: myInputIndex}; + } + + function trySpawn() { + if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) { + return; + } - promises.push(promise); + // Reserve index in `promises` array: we don't actually have the promise to save yet, + // but we don't want recursive `trySpawn` calls to use this same index. + // This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately, + // without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`) + // can observe the intermediate state. + const promisesIndex = promises.length++; + promises[promisesIndex] = mapNext(promisesIndex); } trySpawn(); while (promises.length > 0) { - const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop - - promises.shift(); + const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop + popPromise(inputIndex); if (error) { throw error; } if (done) { + // When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool + if (!preserveOrder) { + continue; + } + return; } // Spawn if just dropped below backpressure limit and below the concurrency limit trySpawn(); - if (value === pMapSkip) { - continue; - } - yield value; } }, }; } +function isPromiseLike(p) { + return typeof p === 'object' && p !== null && 'then' in p && typeof p.then === 'function'; +} + export const pMapSkip = Symbol('skip'); diff --git a/readme.md b/readme.md index bdc3f36..1ad0a7d 100644 --- a/readme.md +++ b/readme.md @@ -92,6 +92,39 @@ Maximum number of promises returned by `mapper` that have resolved but not yet c Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database. +##### preserveOrder + +**Only for `pMapIterable`** + +Type: `boolean`\ +Default: `true` + +Whether the output iterable should produce the results of the `mapper` on elements of the `input` iterable in the same order as the elements were produced. +If `false`, `mapper` results will be produced in the order they are available, which may not match the order the `mapper` inputs were produced by the `input` iterable, but may improve throughput. + +```js +import {pMapIterable} from 'p-map'; +import delay from 'delay'; + +const orderPreservingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; +}, {concurrency: 2, preserveOrder: true}); +// t=0ms +await orderPreservingIterator.next(); // 1 produced at t=100ms +await orderPreservingIterator.next(); // 2 produced at t=100ms +await orderPreservingIterator.next(); // 3 produced at t=105ms + +const throughputOptimizingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; +}, {concurrency: 2, preserveOrder: false}); +// t=0ms +await throughputOptimizingIterator.next(); // 2 produced at t=10ms +await throughputOptimizingIterator.next(); // 3 produced at t=15ms +await throughputOptimizingIterator.next(); // 1 produced at t=100ms +``` + ##### stopOnError **Only for `pMap`** diff --git a/test.js b/test.js index 9db3013..4f61cff 100644 --- a/test.js +++ b/test.js @@ -607,9 +607,9 @@ test('pMapIterable - concurrency: 2', async t => { assertInRange(t, times.get(10), {start: 0, end: 50}); assertInRange(t, times.get(20), {start: 0, end: 50}); - assertInRange(t, times.get(30), {start: 200, end: 250}); - assertInRange(t, times.get(40), {start: 300, end: 350}); - assertInRange(t, times.get(50), {start: 300, end: 350}); + assertInRange(t, times.get(30), {start: 195, end: 250}); + assertInRange(t, times.get(40), {start: 295, end: 350}); + assertInRange(t, times.get(50), {start: 295, end: 350}); }); test('pMapIterable - backpressure', async t => { @@ -637,10 +637,197 @@ test('pMapIterable - backpressure', async t => { t.is(currentValue, 40); }); -test('pMapIterable - pMapSkip', async t => { +test('pMapIterable - complex pMapSkip pattern - concurrency 1', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([ + pMapSkip, 1, + 2, + 3, + pMapSkip, + 4, + 5, + pMapSkip, + pMapSkip, + 6, + 7, + 8, + pMapSkip, + ], async value => value)), [1, 2, 3, 4, 5, 6, 7, 8]); +}); + +test('pMapIterable - complex pMapSkip pattern - concurrency 2', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable([ + pMapSkip, + 1, + 2, + 3, + pMapSkip, + 4, + 5, + pMapSkip, + pMapSkip, + 6, + 7, + 8, + pMapSkip, + ], async value => value, {concurrency: 2})), [1, 2, 3, 4, 5, 6, 7, 8]); +}); + +test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: false', async t => { + const result = await collectAsyncIterable(pMapIterable([ pMapSkip, + 1, 2, - ], async value => value)), [1, 2]); + 3, + pMapSkip, + 4, + 5, + pMapSkip, + pMapSkip, + 6, + 7, + 8, + pMapSkip, + ], async value => value, {concurrency: 2, preserveOrder: false})); + const resultSet = new Set(result); + t.assert(resultSet.has(1)); + t.assert(resultSet.has(2)); + t.assert(resultSet.has(3)); + t.assert(resultSet.has(4)); + t.assert(resultSet.has(5)); + t.assert(resultSet.has(6)); + t.assert(resultSet.has(7)); + t.assert(resultSet.has(8)); + t.assert(result.length === 8); +}); + +test('pMapIterable - async iterable input', async t => { + const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper)); + t.deepEqual(result, [10, 20, 30]); +}); + +test('pMapIterable - pMapSkip + preserveOrder: true - preserves order, even when next input needs to be awaited', async t => { + const result = await collectAsyncIterable(pMapIterable([1, 2, 3], (_value, index) => { + switch (index) { + case 0: { + return pMapSkip; + } + + case 1: { + return delay(100, {value: 2}); + } + + case 2: { + return 3; + } + + default: { + return undefined; + } + } + }, {concurrency: 2, preserveOrder: true})); + t.deepEqual(result, [2, 3]); +}); + +function * promiseGenerator() { + yield (async () => { + await delay(100); + return 1; + })(); + yield (async () => { + await delay(100); + return 2; + })(); + yield (async () => { + await delay(100); + return 3; + })(); +} + +test('pMapIterable - eager spawn when input iterable returns promise', async t => { + const end = timeSpan(); + await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3})); + assertInRange(t, end(), {start: 195, end: 250}); +}); + +test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => { + const end = timeSpan(); + await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 100})); + assertInRange(t, end(), {start: 195, end: 250}); +}); + +test('pMapIterable - preserveOrder: false - yields mappings as they resolve', async t => { + const end = timeSpan(); + const result = await collectAsyncIterable(pMapIterable(sharedInput, mapper, {preserveOrder: false})); + t.deepEqual(result, [30, 20, 10]); + assertInRange(t, end(), {start: 295, end: 350}); +}); + +test('pMapIterable - preserveOrder: false - more complex example', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable([ + [1, 200], + [2, 100], + [3, 150], + [4, 200], + [5, 100], + [6, 75], + ], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]); +}); + +test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { + const input = [100, 200, 10, 36, 13, 45]; + const times = new Map(); + const end = timeSpan(); + + t.deepEqual(await collectAsyncIterable(pMapIterable(input, value => { + times.set(value, end()); + return delay(value, {value}); + }, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY, preserveOrder: false})), [100, 10, 36, 13, 200, 45]); + + assertInRange(t, times.get(100), {start: 0, end: 50}); + assertInRange(t, times.get(200), {start: 0, end: 50}); + assertInRange(t, times.get(10), {start: times.get(100) + 100 - 5, end: times.get(100) + 100 + 50}); + assertInRange(t, times.get(36), {start: times.get(10) + 10 - 5, end: times.get(10) + 10 + 50}); + assertInRange(t, times.get(13), {start: times.get(36) + 36 - 5, end: times.get(36) + 36 + 50}); + assertInRange(t, times.get(45), {start: times.get(13) + 13 - 5, end: times.get(13) + 13 + 50}); +}); + +test('pMapIterable - preserveOrder: false - backpressure', async t => { + // Adjust from 300 to 250 so timings don't align, to deflake + const adjustedLongerSharedInput = [...longerSharedInput]; + adjustedLongerSharedInput[0] = [longerSharedInput[0][0], 250]; + + let currentValue; + + // Concurrency option is forced by an early check + const asyncIterator = pMapIterable(adjustedLongerSharedInput, async value => { + currentValue = await mapper(value); + return currentValue; + }, {backpressure: 2, concurrency: 2, preserveOrder: false})[Symbol.asyncIterator](); + + const {value: value1} = await asyncIterator.next(); + t.is(value1, 20); + + // If backpressure is not respected, than all items will be evaluated in this time + await delay(600); + + t.is(currentValue, 30); + + const {value: value2} = await asyncIterator.next(); + t.is(value2, 10); + + await delay(100); + + t.is(currentValue, 40); +}); + +test('pMapIterable - preserveOrder: false - throws first error to settle', async t => { + await t.throwsAsync(collectAsyncIterable(pMapIterable([ + [async () => { + throw new Error('foo'); + }, 30], + [() => { + throw new Error('bar'); + }, 10], + ], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'}); });