Skip to content

Commit

Permalink
client/finality-grandpa: Reintegrate periodic neighbor packet worker (p…
Browse files Browse the repository at this point in the history
…aritytech#4631)

The `NeighborPacketWorker` within `client/finality-grandpa` does two
things:

1. It receives neighbor packets from components within
`client/finality-grandpa`, sends them down to the `GossipEngine` in
order for neighboring nodes to receive.

2. It periodically sends out the most recent neighbor packet to the
`GossipEngine`.

In order to send out packets it had a clone to a `GossipEgine` within
an atomic reference counter and a mutex. The `NeighborPacketWorker` was
then spawned onto its own asynchronous task.

Instead of running in its own task, this patch reintegrates the
`NeighborPacketWorker` into the main `client/finality-grandpa` task not
requiring the `NeighborPacketWorker` to own a clone of the
`GossipEngine`.

The greater picture

This is a tiny change within a greater refactoring. The overall goal is
to **simplify** how finality-grandpa interacts with the network and to
**reduce** the amount of **unbounded channels** within the logic.

Why no unbounded channels: Bounding channels is needed for backpressure
and proper scheduling. With unbounded channels there is no way of
telling the producer side to slow down for the consumer side to catch
up.  Rephrased, there is no way for the scheduler to know when to favour
the consumer task over the producer task on a crowded channel and the
other way round for an empty channel.

Reducing the amount of shared ownership simplifies the logic and enables
one to use async-await syntax-suggar, given that one does not need to
hold a lock across poll invocations. Using async-await enables one to
use bounded channels without complex logic.
  • Loading branch information
mxinden committed Jan 17, 2020
1 parent 188d59e commit d4fbb89
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 68 deletions.
54 changes: 46 additions & 8 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.

use std::sync::Arc;

use futures::{prelude::*, future::Executor as _, sync::mpsc};
use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _};
use futures03::{
compat::Compat,
stream::StreamExt,
future::{Future as Future03, FutureExt as _, TryFutureExt as _},
};
use log::{debug, trace};
use parking_lot::Mutex;
use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}};

use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use finality_grandpa::{voter, voter_set::VoterSet};
use log::{debug, trace};
use sc_network::{NetworkService, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use parity_scale_codec::{Encode, Decode};
Expand Down Expand Up @@ -134,9 +139,22 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
gossip_engine: GossipEngine<B>,
validator: Arc<GossipValidator<B>>,

/// Sender side of the neighbor packet channel.
///
/// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to
/// the underlying `GossipEngine`.
neighbor_sender: periodic::NeighborPacketSender<B>,

/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
//
// NetworkBridge is required to be clonable, thus one needs to be able to clone its children,
// thus one has to wrap neighor_packet_worker with an Arc Mutex.
neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
}

impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}

impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle.
Expand Down Expand Up @@ -195,14 +213,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
}

let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone());
let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
let reporting_job = report_stream.consume(gossip_engine.clone());

let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender };
let bridge = NetworkBridge {
service,
gossip_engine,
validator,
neighbor_sender: neighbor_packet_sender,
neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
};

let executor = Compat::new(executor);
executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa rebroadcast job task");
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa reporting job task");

Expand Down Expand Up @@ -391,6 +413,21 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
}

impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
loop {
match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) {
None => return Poll03::Ready(
Err(Error::Network("NeighborPacketWorker stream closed.".into()))
),
Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()),
}
}
}
}

fn incoming_global<B: BlockT>(
mut gossip_engine: GossipEngine<B>,
topic: B::Hash,
Expand Down Expand Up @@ -530,6 +567,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
gossip_engine: self.gossip_engine.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
neighbor_packet_worker: self.neighbor_packet_worker.clone(),
}
}
}
Expand Down
118 changes: 61 additions & 57 deletions client/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@

//! Periodic rebroadcast of neighbor packets.

use std::time::{Instant, Duration};

use parity_scale_codec::Encode;
use futures::prelude::*;
use futures::sync::mpsc;
use futures_timer::Delay;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use log::{debug, warn};
use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use log::debug;
use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}};

use sc_network::PeerId;
use sc_network_gossip::GossipEngine;
use sp_runtime::traits::{NumberFor, Block as BlockT};
use super::gossip::{NeighborPacket, GossipMessage};

// how often to rebroadcast, if no other
// How often to rebroadcast, in cases where no new packets are created.
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);

fn rebroadcast_instant() -> Instant {
Expand All @@ -56,56 +51,65 @@ impl<B: BlockT> NeighborPacketSender<B> {
}
}

/// Does the work of sending neighbor packets, asynchronously.
///
/// It may rebroadcast the last neighbor packet periodically when no
/// progress is made.
pub(super) fn neighbor_packet_worker<B>(net: GossipEngine<B>) -> (
impl Future<Item = (), Error = ()> + Send + 'static,
NeighborPacketSender<B>,
) where
B: BlockT,
{
let mut last = None;
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let mut delay = Delay::new(REBROADCAST_AFTER);

let work = futures::future::poll_fn(move || {
loop {
match rx.poll().expect("unbounded receivers do not error; qed") {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some((to, packet))) => {
// send to peers.
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());

// rebroadcasting network.
delay.reset(rebroadcast_instant());
last = Some((to, packet));
}
Async::NotReady => break,
}
}
/// NeighborPacketWorker is listening on a channel for new neighbor packets being produced by
/// components within `finality-grandpa` and forwards those packets to the underlying
/// `NetworkEngine` through the `NetworkBridge` that it is being polled by (see `Stream`
/// implementation). Periodically it sends out the last packet in cases where no new ones arrive.
pub(super) struct NeighborPacketWorker<B: BlockT> {
last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
delay: Delay,
rx: mpsc::UnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
}

impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}

impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new() -> (Self, NeighborPacketSender<B>){
let (tx, rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let delay = Delay::new(REBROADCAST_AFTER);

(NeighborPacketWorker {
last: None,
delay,
rx,
}, NeighborPacketSender(tx))
}
}

// has to be done in a loop because it needs to be polled after
// re-scheduling.
loop {
match (&mut delay).unit_error().compat().poll() {
Err(e) => {
warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e);
delay.reset(rebroadcast_instant());
}
Ok(Async::Ready(())) => {
delay.reset(rebroadcast_instant());

if let Some((ref to, ref packet)) = last {
// send to peers.
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());
}
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
impl <B: BlockT> Stream for NeighborPacketWorker<B> {
type Item = (Vec<PeerId>, GossipMessage<B>);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>
{
let this = &mut *self;
match this.rx.poll_next_unpin(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some((to, packet))) => {
this.delay.reset(rebroadcast_instant());
this.last = Some((to.clone(), packet.clone()));

return Poll::Ready(Some((to, GossipMessage::<B>::from(packet.clone()))));
}
// Don't return yet, maybe the timer fired.
Poll::Pending => {},
};

ready!(this.delay.poll_unpin(cx));

// Getting this far here implies that the timer fired.

this.delay.reset(rebroadcast_instant());

// Make sure the underlying task is scheduled for wake-up.
//
// Note: In case poll_unpin is called after the resetted delay fires again, this
// will drop one tick. Deemed as very unlikely and also not critical.
while let Poll::Ready(()) = this.delay.poll_unpin(cx) {};

if let Some((ref to, ref packet)) = this.last {
return Poll::Ready(Some((to.clone(), GossipMessage::<B>::from(packet.clone()))));
}
});

(work, NeighborPacketSender(tx))
return Poll::Pending;
}
}
17 changes: 14 additions & 3 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,13 @@ struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: futures03::compat::Compat<NetworkBridge<Block, N>>,
}

impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
where
Block: BlockT,
N: NetworkT<Block> + Sync,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block> + Send + Sync + 'static,
Expand All @@ -681,7 +682,7 @@ where
voting_rule,
voters: Arc::new(voters),
config,
network,
network: network.clone(),
set_id: persistent_data.authority_set.set_id(),
authority_set: persistent_data.authority_set.clone(),
consensus_changes: persistent_data.consensus_changes.clone(),
Expand All @@ -694,6 +695,7 @@ where
voter: Box::new(futures::empty()) as Box<_>,
env,
voter_commands_rx,
network: futures03::future::TryFutureExt::compat(network),
};
work.rebuild_voter();
work
Expand Down Expand Up @@ -831,7 +833,7 @@ where
impl<B, E, Block, N, RA, SC, VR> Future for VoterWork<B, E, Block, N, RA, SC, VR>
where
Block: BlockT,
N: NetworkT<Block> + Sync,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block> + Send + Sync + 'static,
Expand Down Expand Up @@ -878,6 +880,15 @@ where
}
}

match self.network.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// the network bridge future should never conclude.
return Ok(Async::Ready(()))
}
e @ Err(_) => return e,
};

Ok(Async::NotReady)
}
}
Expand Down

0 comments on commit d4fbb89

Please sign in to comment.