Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/finality-granpda/until_imported: Rework pinning (#5983)
Browse files Browse the repository at this point in the history
An `UntilImported` stream wraps a `Stream` of incoming messages and
waits for blocks those messages are based on before passing the messages
on.

The above `Stream` of incoming messages implements `Unpin`, thus there
is no need to use `pin_project` on the `UntilImported` struct. Instead
one only has to add the `Unpin` trait bound on the `I` trait parameter.
  • Loading branch information
mxinden committed May 12, 2020
1 parent 494bc3e commit 64d603a
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions client/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use super::{
use log::{debug, warn};
use sp_utils::mpsc::TracingUnboundedReceiver;
use futures::prelude::*;
use futures::stream::Fuse;
use futures::stream::{Fuse, StreamExt};
use futures_timer::Delay;
use finality_grandpa::voter;
use parking_lot::Mutex;
Expand Down Expand Up @@ -137,14 +137,16 @@ impl Drop for Metrics {
}
}

/// Buffering imported messages until blocks with given hashes are imported.
#[pin_project::pin_project]
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
/// Buffering incoming messages until blocks with given hashes are imported.
pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
#[pin]
inner: Fuse<I>,
incoming_messages: Fuse<I>,
ready: VecDeque<M::Blocked>,
/// Interval at which to check status of each awaited block.
check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send + Sync>>,
Expand All @@ -159,19 +161,25 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester,
metrics: Option<Metrics>,
}

impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M > where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{}

impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT,
BlockStatus: BlockStatusT<Block>,
BlockSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
/// Create a new `UntilImported` wrapper.
pub(crate) fn new(
import_notifications: ImportNotifications<Block>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
stream: I,
incoming_messages: I,
identifier: &'static str,
metrics: Option<Metrics>,
) -> Self {
Expand All @@ -192,7 +200,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
import_notifications: import_notifications.fuse(),
block_sync_requester,
status_check,
inner: stream.fuse(),
incoming_messages: incoming_messages.fuse(),
ready: VecDeque::new(),
check_pending: Box::pin(check_pending),
pending: HashMap::new(),
Expand All @@ -206,23 +214,23 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
Block: BlockT,
BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
type Item = Result<M::Blocked, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We are using a `this` variable in order to allow multiple simultaneous mutable borrow
// to `self`.
let mut this = self.project();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We are using a `this` variable in order to allow multiple simultaneous mutable borrow to
// `self`.
let this = &mut *self;

loop {
match Stream::poll_next(Pin::new(&mut this.inner), cx) {
match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(input)) => {
// new input: schedule wait of any parts which require
// blocks to be known.
match M::needs_waiting(input, this.status_check)? {
match M::needs_waiting(input, &this.status_check)? {
DiscardWaitOrReady::Discard => {},
DiscardWaitOrReady::Wait(items) => {
for (target_hash, target_number, wait) in items {
Expand All @@ -245,7 +253,7 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
}

loop {
match Stream::poll_next(Pin::new(&mut this.import_notifications), cx) {
match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(notification)) => {
// new block imported. queue up all messages tied to that hash.
Expand Down Expand Up @@ -315,7 +323,7 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
return Poll::Ready(Some(Ok(ready)))
}

if this.import_notifications.is_done() && this.inner.is_done() {
if this.import_notifications.is_done() && this.incoming_messages.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
Expand Down

0 comments on commit 64d603a

Please sign in to comment.