Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/finality-granpda/until_imported: Rework pinning #5983

Merged
merged 1 commit into from
May 12, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions client/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use super::{
use log::{debug, warn};
use sp_utils::mpsc::TracingUnboundedReceiver;
use futures::prelude::*;
use futures::stream::Fuse;
use futures::stream::{Fuse, StreamExt};
use futures_timer::Delay;
use finality_grandpa::voter;
use parking_lot::Mutex;
Expand Down Expand Up @@ -137,14 +137,16 @@ impl Drop for Metrics {
}
}

/// Buffering imported messages until blocks with given hashes are imported.
#[pin_project::pin_project]
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
/// Buffering incoming messages until blocks with given hashes are imported.
pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
#[pin]
inner: Fuse<I>,
incoming_messages: Fuse<I>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the side this pull requests renames inner to incoming_messages. I find this name more descriptive compared to the abstract term inner. Let me know if you would like me to exclude this change from the pull request.

ready: VecDeque<M::Blocked>,
/// Interval at which to check status of each awaited block.
check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send + Sync>>,
Expand All @@ -159,19 +161,25 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester,
metrics: Option<Metrics>,
}

impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M > where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{}

impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
Block: BlockT,
BlockStatus: BlockStatusT<Block>,
BlockSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
/// Create a new `UntilImported` wrapper.
pub(crate) fn new(
import_notifications: ImportNotifications<Block>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
stream: I,
incoming_messages: I,
identifier: &'static str,
metrics: Option<Metrics>,
) -> Self {
Expand All @@ -192,7 +200,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
import_notifications: import_notifications.fuse(),
block_sync_requester,
status_check,
inner: stream.fuse(),
incoming_messages: incoming_messages.fuse(),
ready: VecDeque::new(),
check_pending: Box::pin(check_pending),
pending: HashMap::new(),
Expand All @@ -206,23 +214,23 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
Block: BlockT,
BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked>,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
type Item = Result<M::Blocked, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We are using a `this` variable in order to allow multiple simultaneous mutable borrow
// to `self`.
let mut this = self.project();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We are using a `this` variable in order to allow multiple simultaneous mutable borrow to
// `self`.
let this = &mut *self;

loop {
match Stream::poll_next(Pin::new(&mut this.inner), cx) {
match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(input)) => {
// new input: schedule wait of any parts which require
// blocks to be known.
match M::needs_waiting(input, this.status_check)? {
match M::needs_waiting(input, &this.status_check)? {
DiscardWaitOrReady::Discard => {},
DiscardWaitOrReady::Wait(items) => {
for (target_hash, target_number, wait) in items {
Expand All @@ -245,7 +253,7 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
}

loop {
match Stream::poll_next(Pin::new(&mut this.import_notifications), cx) {
match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(notification)) => {
// new block imported. queue up all messages tied to that hash.
Expand Down Expand Up @@ -315,7 +323,7 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
return Poll::Ready(Some(Ok(ready)))
}

if this.import_notifications.is_done() && this.inner.is_done() {
if this.import_notifications.is_done() && this.incoming_messages.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
Expand Down