Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: submitAndWatch replace old messages if it's lagging #4901

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

niklasad1
Copy link
Member

@niklasad1 niklasad1 commented Jun 27, 2024

Close #3076

The fix is really just that older messages are replaced if the client can't keep up with the server instead.
Because I wanted the same functionality as pipe_from_stream for both pending/subscription I added two wrapper types on-top of the types from jsonrpsee to make it nicer.

I added a trait Buffer so I could still use pipe_from_stream but that abstraction is a little leaky but only to avoid adding an identical method/function with another strategy...

stream.filter_map(move |event| async move { handle_event(event) }).boxed();

// If the subscription is too slow older events will be overwritten.
sink.pipe_from_stream(stream.boxed(), RingBuffer::new(3)).await;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix....

@niklasad1 niklasad1 requested a review from lexnv June 27, 2024 17:30
@paritytech-cicd-pr
Copy link

The CI pipeline was cancelled due to failure one of the required jobs.
Job name: cargo-clippy
Logs: https://gitlab.parity.io/parity/mirrors/polkadot-sdk/-/jobs/6579613

fn new() -> Self {
Self { inner: VecDeque::with_capacity(DEFAULT_BUF_SIZE), max_cap: DEFAULT_BUF_SIZE }
pub fn new(cap: usize) -> Self {
Self { inner: VecDeque::with_capacity(cap), max_cap: DEFAULT_BUF_SIZE }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the user pass a capacity higher than 16?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but not used yet

.expect("Serialize infallible; qed")
/// Send a message on the subscription.
pub async fn send(&self, result: &impl Serialize) -> Result<(), DisconnectError> {
self.0.send(self.to_sub_message(result)).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This makes the api easier to use!

Copy link
Contributor

@lexnv lexnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Nice job here 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[RPC-Spec-V2]: transactionWatch_unstable_submitAndWatch shouldn't use pipe_from_stream
4 participants