Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pMapIterable preserveOrder option #74

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,39 @@ export type IterableOptions = BaseOptions & {
Default: `options.concurrency`
*/
readonly backpressure?: number;

/**
Whether the output iterable should produce the results of the `mapper` on elements of the `input` iterable in the same order as the elements were produced.

If `false`, `mapper` results will be produced in the order they are available, which may not match the order the `mapper` inputs were produced by the `input` iterable, but may improve throughput.

@example
```
import {pMapIterable} from 'p-map';
import delay from 'delay';

const orderPreservingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => {
await delay(delayMs);
return value;
}, {concurrency: 2, preserveOrder: true});
// t=0ms
await orderPreservingIterator.next(); // 1 produced at t=100ms
await orderPreservingIterator.next(); // 2 produced at t=100ms
await orderPreservingIterator.next(); // 3 produced at t=105ms

const throughputOptimizingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => {
await delay(delayMs);
return value;
}, {concurrency: 2, preserveOrder: false});
// t=0ms
await throughputOptimizingIterator.next(); // 2 produced at t=10ms
await throughputOptimizingIterator.next(); // 3 produced at t=15ms
await throughputOptimizingIterator.next(); // 1 produced at t=100ms
```

@default `true`
*/
readonly preserveOrder?: boolean;
};

type MaybePromise<T> = T | Promise<T>;
Expand Down
166 changes: 130 additions & 36 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ export function pMapIterable(
{
concurrency = Number.POSITIVE_INFINITY,
backpressure = concurrency,
preserveOrder = true,
} = {},
) {
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
Expand All @@ -186,84 +187,177 @@ export function pMapIterable(
throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`);
}

if (typeof preserveOrder !== 'boolean') {
throw new TypeError(`Expected \`preserveOrder\` to be a boolean, got \`${preserveOrder}\` (${typeof preserveOrder})`);
}

return {
async * [Symbol.asyncIterator]() {
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();

const promises = [];
const promisesIndexFromInputIndex = {};
const inputIndexFromPromisesIndex = [];
let runningMappersCount = 0;
let isDone = false;
let index = 0;
let inputIndex = 0;
let outputIndex = 0; // Only used when `preserveOrder: false`

const nextPromise = preserveOrder
// Treat `promises` as a queue
? () => {
// May be undefined bc of `pMapSkip`s
while (promisesIndexFromInputIndex[outputIndex] === undefined) {
outputIndex += 1;
}

function trySpawn() {
if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
return;
return promises[promisesIndexFromInputIndex[outputIndex++]];
}
// Treat `promises` as a pool (order doesn't matter)
: () => Promise.race(promises);

function popPromise(inputIndex) {
// Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array
const tail = promises.pop();
const tailInputIndex = inputIndexFromPromisesIndex.pop();
const promisesIndex = promisesIndexFromInputIndex[inputIndex];
delete promisesIndexFromInputIndex[inputIndex];

if (promisesIndex !== promises.length) {
promises[promisesIndex] = tail;
inputIndexFromPromisesIndex[promisesIndex] = tailInputIndex;
promisesIndexFromInputIndex[tailInputIndex] = promisesIndex;
}
}

const promise = (async () => {
const {done, value} = await iterator.next();
async function mapNext(promisesIndex) {
let next = iterator.next();

const myInputIndex = inputIndex++; // Save this promise's index before `trySpawn`ing others
runningMappersCount++;
promisesIndexFromInputIndex[myInputIndex] = promisesIndex;
inputIndexFromPromisesIndex[promisesIndex] = myInputIndex;

if (isPromiseLike(next)) {
// Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse),
// and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises,
// so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel.
// This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via
// `options.concurrency` and `options.backpressure`.
// This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing
// (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`.
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
// `async` operations like disk reads, network requests, etc.
// Overall, this can reduce the total time taken to process all elements.
if (backpressure !== Number.POSITIVE_INFINITY) {
// Spawn if still below concurrency and backpressure limit
trySpawn();
}

if (done) {
return {done: true};
try {
next = await next;
} catch (error) {
isDone = true;
return {result: {error}, inputIndex: myInputIndex};
}
}

runningMappersCount++;
let {done, value} = next;

// Spawn if still below concurrency and backpressure limit
trySpawn();
if (done) {
isDone = true;
return {result: {done: true}, inputIndex: myInputIndex};
}

try {
const returnValue = await mapper(await value, index++);
// Spawn if still below concurrency and backpressure limit
trySpawn();

runningMappersCount--;
let returnValue;
try {
if (isPromiseLike(value)) {
value = await value;
}

if (returnValue === pMapSkip) {
const index = promises.indexOf(promise);
returnValue = mapper(value, myInputIndex);
if (isPromiseLike(returnValue)) {
returnValue = await returnValue;
}
} catch (error) {
isDone = true;
return {result: {error}, inputIndex: myInputIndex};
}

if (index > 0) {
promises.splice(index, 1);
}
}
runningMappersCount--;

if (returnValue === pMapSkip) {
// If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed
// NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop,
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and
// this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of
// `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously,
// before any `await`s.
if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) {
popPromise(myInputIndex);
return promises[promisesIndexFromInputIndex[myInputIndex + 1]];
}

// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();
// Otherwise, start mapping the next input element
delete promisesIndexFromInputIndex[myInputIndex];
// Not necessary to `delete inputIndexFromPromisesIndex[promisesIndex]` since `inputIndexFromPromisesIndex[promisesIndex]` is only used
// when this promise resolves, but by that point this recursive `mapNext(promisesIndex)` call will have necessarily overwritten it.
return mapNext(promisesIndex);
}

return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
})();
// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();

return {result: {value: returnValue}, inputIndex: myInputIndex};
}

function trySpawn() {
if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
return;
}

promises.push(promise);
// Reserve index in `promises` array: we don't actually have the promise to save yet,
// but we don't want recursive `trySpawn` calls to use this same index.
// This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately,
// without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`)
// can observe the intermediate state.
const promisesIndex = promises.length++;
promises[promisesIndex] = mapNext(promisesIndex);
}

trySpawn();

while (promises.length > 0) {
const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop

promises.shift();
const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop
popPromise(inputIndex);

if (error) {
throw error;
}

if (done) {
// When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool
if (!preserveOrder) {
continue;
}

return;
}

// Spawn if just dropped below backpressure limit and below the concurrency limit
trySpawn();

if (value === pMapSkip) {
continue;
}

yield value;
}
},
};
}

function isPromiseLike(p) {
return typeof p === 'object' && p !== null && 'then' in p && typeof p.then === 'function';
}

export const pMapSkip = Symbol('skip');
33 changes: 33 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ Maximum number of promises returned by `mapper` that have resolved but not yet c

Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.

##### preserveOrder

**Only for `pMapIterable`**

Type: `boolean`\
Default: `true`

Whether the output iterable should produce the results of the `mapper` on elements of the `input` iterable in the same order as the elements were produced.
If `false`, `mapper` results will be produced in the order they are available, which may not match the order the `mapper` inputs were produced by the `input` iterable, but may improve throughput.

```js
import {pMapIterable} from 'p-map';
import delay from 'delay';

const orderPreservingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => {
await delay(delayMs);
return value;
}, {concurrency: 2, preserveOrder: true});
// t=0ms
await orderPreservingIterator.next(); // 1 produced at t=100ms
await orderPreservingIterator.next(); // 2 produced at t=100ms
await orderPreservingIterator.next(); // 3 produced at t=105ms

const throughputOptimizingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => {
await delay(delayMs);
return value;
}, {concurrency: 2, preserveOrder: false});
// t=0ms
await throughputOptimizingIterator.next(); // 2 produced at t=10ms
await throughputOptimizingIterator.next(); // 3 produced at t=15ms
await throughputOptimizingIterator.next(); // 1 produced at t=100ms
```

##### stopOnError

**Only for `pMap`**
Expand Down
Loading