Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/finality-grandpa: Make round_communication use bounded channel #4691

Merged
merged 4 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 49 additions & 40 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.

use futures::{prelude::*, sync::mpsc};
use futures::prelude::*;
use futures03::{
channel::mpsc as mpsc03,
compat::Compat,
future::{Future as Future03},
stream::StreamExt,
future::{Future as Future03, ready},
sink::Sink as Sink03,
stream::{Stream as Stream03, StreamExt},
};
use log::{debug, trace};
use parking_lot::Mutex;
Expand Down Expand Up @@ -271,8 +272,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
local_key: Option<AuthorityPair>,
has_voted: HasVoted<B>,
) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
impl Stream03<Item=SignedMessage<B>> + Unpin,
impl Sink03<Message<B>, Error=Error> + Unpin,
) {
self.note_round(
round,
Expand All @@ -290,22 +291,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
});

let topic = round_topic::<B>(round.0, set_id.0);
let incoming = Compat::new(self.gossip_engine.messages_for(topic)
.map(|item| Ok::<_, ()>(item)))
.filter_map(|notification| {
let incoming = self.gossip_engine.messages_for(topic)
.filter_map(move |notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if let Err(ref e) = decoded {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
}
decoded.ok()
})
.and_then(move |msg| {
match msg {
GossipMessage::Vote(msg) => {

match decoded {
Err(ref e) => {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
return ready(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if this read future::ready instead. Wasn't immediately obvious to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. As a rule of thumb, importing free functions into the local namespace hurts readability.

module::function lets us know where to look for the function without searching the imports.
function lets us know that the function is in the current module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fixed with 6858319.

}
Ok(GossipMessage::Vote(msg)) => {
// check signature.
if !voters.contains_key(&msg.message.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
return Ok(None);
return ready(None);
}

match &msg.message.message {
Expand All @@ -332,18 +331,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
},
};

Ok(Some(msg.message))
ready(Some(msg.message))
}
_ => {
debug!(target: "afg", "Skipping unknown message type");
return Ok(None);
return ready(None);
}
}
})
.filter_map(|x| x)
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
});

let (tx, out_rx) = mpsc::unbounded();
let (tx, out_rx) = mpsc03::channel(0);
let outgoing = OutgoingMessages::<B> {
round: round.0,
set_id: set_id.0,
Expand All @@ -353,14 +350,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
has_voted,
};

let out_rx = out_rx.map_err(move |()| Error::Network(
format!("Failed to receive on unbounded receiver for round {}", round.0)
));

// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = incoming.select(out_rx);
let incoming = futures03::stream::select(incoming, out_rx);

(incoming, outgoing)
}
Expand Down Expand Up @@ -641,17 +634,27 @@ struct OutgoingMessages<Block: BlockT> {
round: RoundNumber,
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
sender: mpsc03::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>,
has_voted: HasVoted<Block>,
}

impl<Block: BlockT> Sink for OutgoingMessages<Block>
impl<B: BlockT> Unpin for OutgoingMessages<B> {}

impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
where
Message<Block>: Sized,
{
type SinkItem = Message<Block>;
type SinkError = Error;
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
})})
}

fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
// if we've voted on this round previously under the same key, send that vote instead
match &mut msg {
finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
Expand Down Expand Up @@ -707,17 +710,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
self.network.gossip_message(topic, message.encode(), false);

// forward the message to the inner sender.
let _ = self.sender.unbounded_send(signed);
}
return self.sender.start_send(signed).map_err(|e| {
Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
});
};

Ok(AsyncSink::Ready)
Ok(())
}

fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Poll03::Ready(Ok(()))
}

fn close(&mut self) -> Poll<(), Error> {
// ignore errors since we allow this inner sender to be closed already.
self.sender.close().or_else(|_| Ok(Async::Ready(())))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_close(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
})})
}
}

Expand Down
9 changes: 8 additions & 1 deletion client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use std::time::Duration;
use log::{debug, warn, info};
use parity_scale_codec::{Decode, Encode};
use futures::prelude::*;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use futures03::{
compat::{Compat, CompatSink},
future::{FutureExt as _, TryFutureExt as _},
stream::StreamExt as _,
};
use futures_timer::Delay;
use parking_lot::RwLock;
use sp_blockchain::{HeaderBackend, Error as ClientError};
Expand Down Expand Up @@ -608,6 +612,9 @@ where
has_voted,
);

let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
let outgoing = CompatSink::new(outgoing);

// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new(
Expand Down