Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement parallel processing in sync() method #843

Merged
merged 75 commits into from
Feb 27, 2024
Merged

implement parallel processing in sync() method #843

merged 75 commits into from
Feb 27, 2024

Conversation

didobagi
Copy link
Contributor

No description provided.

github-actions bot and others added 30 commits January 9, 2024 12:31
* sdk: fix tests not building

* sdk: tweak polling tx confirmation
* sdk: add option to get signed settlepnl tix from a market order

* fix lint
* bigz/add-lp-order-risk-mitigations

* friendly perp position iter

* update parmams mutability

* add time_since_last_liquidity_change

* add new variable to margin context mode

* add attempt to burn shares in force cancel/settle pnl

* remove from force cancel orders

* sdk: fix tests not building

* change place orders

* rmeove unused mut

* address some comments

* add to settle_pnl

* use oracle orders

* tweak order params

* oracle order tests

* add typescript test, fix order param market_type

* remove logic from settle_expired_position

* CHANGELOG

---------

Co-authored-by: Chris Heaney <[email protected]>
This reverts commit 63c7208.
* Added handling for signature time measurement

* Added opt-in metrics flag to drift client

* fixed prettier

---------

Co-authored-by: lowkeynicc <[email protected]>
this.updateLatestSlot(slot);

const programAccountBufferMap = new Map<string, Buffer>();
for (const programAccount of rpcResponseAndContext.value) {
rpcResponseAndContext.value.forEach(programAccount => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you benchmarked this change against whatever the average response length received is?

Last I checked forEach was the least performant way to iterate in Javascript.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think it makes much difference with for...of, main improvement in term of execution time i have benchmarked when calling userMap.subscribe using keeper bot went from 120 seconds to 11 on my local machine was due to parallel processing, which is right after this iteration. i apologize in advance im still learing and improving

Copy link
Member

@wphan wphan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey thanks for the PR! finally got around to testing it, and i think it the async-ness of forEach is pretty nice here, but since this is a bit stale now, do you mind rebasing it with the latest master? after that should be good to merge

const promises = Array.from(programAccountBufferMap.entries()).map(async ([key, buffer]) => {
const index = await Promise.race(semaphore.map((p, index) => p.then(() => index)));
semaphore[index] = processAccount(key, buffer).then(() => {
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need this .then..return

});

const concurrencyLimit = 20;
const semaphore = new Array(concurrencyLimit).fill(Promise.resolve());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for this semaphore? does it still work if you just let all the promises run at once?

Comment on lines +335 to +342
const promises = Array.from(programAccountBufferMap.entries()).map(async ([key, buffer]) => {
const index = await Promise.race(semaphore.map((p, index) => p.then(() => index)));
semaphore[index] = processAccount(key, buffer).then(() => {
return;
});
});

await Promise.all(promises.concat(semaphore));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const promises = Array.from(programAccountBufferMap.entries()).map(async ([key, buffer]) => {
const index = await Promise.race(semaphore.map((p, index) => p.then(() => index)));
semaphore[index] = processAccount(key, buffer).then(() => {
return;
});
});
await Promise.all(promises.concat(semaphore));
const promises = Array<Promise<void>>();
programAccountBufferMap.forEach(async (buffer, key) => {
promises.push(processAccount(key, buffer));
});
await Promise.all(promises);

you should be able to reduce it to this, not sure if semaphore is necssary

@wphan
Copy link
Member

wphan commented Feb 27, 2024

curious to try this out more throughly, gonna change the target branch and finish the merge there. Thank you @didobagi !

@wphan wphan changed the base branch from master to merge/didobagi/userMapSync February 27, 2024 22:29
@wphan wphan merged commit ce910cf into drift-labs:merge/didobagi/userMapSync Feb 27, 2024
wphan added a commit that referenced this pull request Feb 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants