Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally allow pMapIterable to yield results out of order #72

Open
tgfisher4 opened this issue May 14, 2024 · 2 comments · May be fixed by #74
Open

Optionally allow pMapIterable to yield results out of order #72

tgfisher4 opened this issue May 14, 2024 · 2 comments · May be fixed by #74

Comments

@tgfisher4
Copy link

tgfisher4 commented May 14, 2024

pMapIterable currently yields mapper results in the order they were obtained from the input iterable. In other words, pMapIterable always waits for, and returns, the first promise in the queue. In my use case, the order of my input iterable is insignificant: it is essentially an unordered stream. In this use case, I want to asyncly map, with bounded concurrency, each element of the iterable and treat the results as another unordered stream: the output order does not need to match the input order. Forcing the output order to match the input order introduces head-of-line blocking: mapper results later in the promises queue that are currently available are blocked from further processing by pending results earlier in the queue. Additionally, new promises cannot be started even though old promises have fulfilled, underutilizing the allowed concurrency.

I'd like to propose adding a new pMapIterable option that treats the promises buffer like a pool rather than a queue. In other words, the returned async iterable would yield results from the promises buffer as soon as they are available, rather than in FIFO order. Perhaps it could be called preserveOrder and default to true.

I'd be happy to work on a PR if you're interested in accepting such a feature: I think it should be relatively straightforward to devise an implementation based around the conditional use of await Promise.race(promises) instead of await promises[0].

P.S.: Thanks for all these nifty p-utils!

@sindresorhus
Copy link
Owner

This would indeed be useful. I agree with the option name. PR welcome 🙏

@lacherogwu
Copy link

lacherogwu commented Jun 4, 2024

It is a great use case, but I'm trying to approach it differently.
It should be a much simpler solution @sindresorhus take a look here
https://github.com/lacherogwu/p-map/tree/bug/pMapIterable-concurrency

I haven't touched the backpressure yet, and for some reason, I couldn't get the tests working with my implementation, and I found it hard to debug

this is the concept, I implemented this in a project I was needed it

export async function* mapIterable(items: any[], cb: (item: any) => Promise<any>, options?: { concurrency: number }) {
	type P = { done: boolean; value: any; promise: Promise<any> };

	const concurrency = options?.concurrency ?? 1;
	const next = () => {
		const item = items.shift();
		const promise = cb(item).then(value => ({ value, done: items.length === 0, promise })) as Promise<P>;
		return promise;
	};

	const promises = Array.from<undefined>({ length: Math.min(concurrency, items.length) }) //
		.reduce(set => set.add(next()), new Set<Promise<P>>());

	while (promises.size > 0) {
		const { value, done, promise } = await Promise.race(promises);
		promises.delete(promise);
		yield value;

		if (!done) {
			promises.add(next());
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants