Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/finality-grandpa: Reintegrate gossip validator report stream #4661

Merged
merged 2 commits into from
Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 8 additions & 56 deletions client/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@
//! We only send polite messages to peers,

use sp_runtime::traits::{NumberFor, Block as BlockT, Zero};
use sc_network_gossip::{GossipEngine, MessageIntent, ValidatorContext};
use sc_network_gossip::{MessageIntent, ValidatorContext};
use sc_network::{config::Roles, PeerId, ReputationChange};
use parity_scale_codec::{Encode, Decode};
use sp_finality_grandpa::AuthorityId;

use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug, warn};
use log::{trace, debug};
use futures::prelude::*;
use futures::sync::mpsc;
use futures03::channel::mpsc;
use rand::seq::SliceRandom;

use crate::{environment, CatchUp, CompactCommit, SignedMessage};
Expand Down Expand Up @@ -1178,15 +1178,15 @@ impl<Block: BlockT> GossipValidator<Block> {
pub(super) fn new(
config: crate::Config,
set_state: environment::SharedVoterSetState<Block>,
) -> (GossipValidator<Block>, ReportStream) {
) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) {
let (tx, rx) = mpsc::unbounded();
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
report_sender: tx,
};

(val, ReportStream { reports: rx })
(val, rx)
}

/// Note a round in the current set has started.
Expand Down Expand Up @@ -1445,57 +1445,9 @@ impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Bloc
}
}

struct PeerReport {
who: PeerId,
cost_benefit: ReputationChange,
}

// wrapper around a stream of reports.
#[must_use = "The report stream must be consumed"]
pub(super) struct ReportStream {
reports: mpsc::UnboundedReceiver<PeerReport>,
}

impl ReportStream {
/// Consume the report stream, converting it into a future that
/// handles all reports.
pub(super) fn consume<B>(self, net: GossipEngine<B>)
-> impl Future<Item=(),Error=()> + Send + 'static
where
B: BlockT,
{
ReportingTask {
reports: self.reports,
net,
}
}
}

/// A future for reporting peers.
#[must_use = "Futures do nothing unless polled"]
struct ReportingTask<B: BlockT> {
reports: mpsc::UnboundedReceiver<PeerReport>,
net: GossipEngine<B>,
}

impl<B: BlockT> Future for ReportingTask<B> {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.reports.poll() {
Err(_) => {
warn!(target: "afg", "Report stream terminated unexpectedly");
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(PeerReport { who, cost_benefit }))) =>
self.net.report(who, cost_benefit),
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
}
pub(super) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs on this struct & members?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #4684

}

#[cfg(test)]
Expand Down
58 changes: 43 additions & 15 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.

use futures::{prelude::*, future::Executor as _, sync::mpsc};
use futures::{prelude::*, sync::mpsc};
use futures03::{
channel::mpsc as mpsc03,
compat::Compat,
future::{Future as Future03},
stream::StreamExt,
future::{Future as Future03, FutureExt as _, TryFutureExt as _},
};
use log::{debug, trace};
use parking_lot::Mutex;
Expand All @@ -52,7 +53,12 @@ use crate::{
};
use crate::environment::HasVoted;
use gossip::{
GossipMessage, FullCatchUpMessage, FullCommitMessage, VoteMessage, GossipValidator
FullCatchUpMessage,
FullCommitMessage,
GossipMessage,
GossipValidator,
PeerReport,
VoteMessage,
};
use sp_finality_grandpa::{
AuthorityPair, AuthorityId, AuthoritySignature, SetId as SetIdNumber, RoundNumber,
Expand Down Expand Up @@ -148,9 +154,18 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {

/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
//
// NetworkBridge is required to be clonable, thus one needs to be able to clone its children,
// thus one has to wrap neighor_packet_worker with an Arc Mutex.
// `NetworkBridge` is required to be clonable, thus one needs to be able to clone its children,
// thus one has to wrap neighor_packet_worker with an `Arc` `Mutex`.
neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,

/// Receiver side of the peer report stream populated by the gossip validator, forwarded to the
/// gossip engine.
//
// `NetworkBridge` is required to be clonable, thus one needs to be able to clone its children,
// thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is
// just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer
// channel implementation.
gossip_validator_report_stream: Arc<Mutex<mpsc03::UnboundedReceiver<PeerReport>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, what does the plan to phase out the unbounded channel look like after this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
Expand All @@ -165,7 +180,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
executor: &impl futures03::task::Spawn,
on_exit: impl futures03::Future<Output = ()> + Clone + Send + Unpin + 'static,
) -> Self {
let (validator, report_stream) = GossipValidator::new(
config,
Expand Down Expand Up @@ -214,20 +228,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}

let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
let reporting_job = report_stream.consume(gossip_engine.clone());

let bridge = NetworkBridge {
service,
gossip_engine,
validator,
neighbor_sender: neighbor_packet_sender,
neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)),
};

let executor = Compat::new(executor);
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa reporting job task");

bridge
}

Expand Down Expand Up @@ -418,13 +428,30 @@ impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> {

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
loop {
match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) {
None => return Poll03::Ready(
Err(Error::Network("NeighborPacketWorker stream closed.".into()))
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
Poll03::Ready(Some((to, packet))) => {
self.gossip_engine.send_message(to, packet.encode());
},
Poll03::Ready(None) => return Poll03::Ready(
Err(Error::Network("Neighbor packet worker stream closed.".into()))
),
Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()),
Poll03::Pending => break,
}
}

loop {
match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
Poll03::Ready(Some(PeerReport { who, cost_benefit })) => {
self.gossip_engine.report(who, cost_benefit);
},
Poll03::Ready(None) => return Poll03::Ready(
Err(Error::Network("Gossip validator report stream closed.".into()))
),
Poll03::Pending => break,
}
}

Poll03::Pending
}
}

Expand Down Expand Up @@ -568,6 +595,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
neighbor_packet_worker: self.neighbor_packet_worker.clone(),
gossip_validator_report_stream: self.gossip_validator_report_stream.clone(),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> (
config(),
voter_set_state(),
executor,
Exit,
);

(
Expand Down
1 change: 0 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
config.clone(),
persistent_data.set_state.clone(),
&executor,
on_exit.clone(),
);

register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
Expand Down
1 change: 0 additions & 1 deletion client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
config.clone(),
persistent_data.set_state.clone(),
&executor,
on_exit.clone(),
);

let observer_work = ObserverWork::new(
Expand Down
4 changes: 1 addition & 3 deletions client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use sc_network_test::{
use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
use futures_timer::Delay;
use futures03::{StreamExt as _, TryStreamExt as _};
use futures03::TryStreamExt as _;
use tokio::runtime::current_thread;
use sp_keyring::Ed25519Keyring;
use sc_client::LongestChain;
Expand Down Expand Up @@ -1270,7 +1270,6 @@ fn voter_persists_its_votes() {
config.clone(),
set_state,
&threads_pool,
Exit,
);

let (round_rx, round_tx) = network.round_communication(
Expand Down Expand Up @@ -1675,7 +1674,6 @@ fn grandpa_environment_respects_voting_rules() {
config.clone(),
set_state.clone(),
&threads_pool,
Exit,
);

Environment {
Expand Down