Skip to content

Commit

Permalink
Prioritisation network manager + transactions manager + eth request h…
Browse files Browse the repository at this point in the history
…andler (#6590)

Co-authored-by: DaniPopes <[email protected]>
  • Loading branch information
emhane and DaniPopes committed Mar 6, 2024
1 parent e42fb32 commit 422b8f8
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 236 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pin-project.workspace = true
tokio = { workspace = true, features = ["io-util", "net", "macros", "rt-multi-thread", "time"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["codec"] }
futures-test = "0.3.30"

# io
serde = { workspace = true, optional = true }
Expand Down
78 changes: 78 additions & 0 deletions crates/net/network/src/budget.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/// Default budget to try and drain streams.
///
/// Default is 10 iterations.
pub const DEFAULT_BUDGET_TRY_DRAIN_STREAM: u32 = 10;

/// Default budget to try and drain [`Swarm`](crate::swarm::Swarm).
///
/// Default is 10 [`SwarmEvent`](crate::swarm::SwarmEvent)s.
pub const DEFAULT_BUDGET_TRY_DRAIN_SWARM: u32 = 10;

/// Default budget to try and drain pending messages from [`NetworkHandle`](crate::NetworkHandle)
/// channel. Polling the [`TransactionsManager`](crate::transactions::TransactionsManager) future
/// sends these types of messages.
//
// Default is 40 outgoing transaction messages.
pub const DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL: u32 =
4 * DEFAULT_BUDGET_TRY_DRAIN_STREAM;

/// Default budget to try and drain stream of
/// [`NetworkTransactionEvent`](crate::transactions::NetworkTransactionEvent)s from
/// [`NetworkManager`](crate::NetworkManager).
///
/// Default is 10 incoming transaction messages.
pub const DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS: u32 = DEFAULT_BUDGET_TRY_DRAIN_SWARM;

/// Default budget to try and flush pending pool imports to pool. This number reflects the number
/// of transactions that can be queued for import to pool in each iteration of the loop in the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) future.
//
// Default is 40 pending pool imports.
pub const DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS: u32 = 4 * DEFAULT_BUDGET_TRY_DRAIN_STREAM;

/// Default budget to try and stream hashes of successfully imported transactions from the pool.
///
/// Default is naturally same as the number of transactions to attempt importing,
/// [`DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS`], so 40 pool imports.
pub const DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS: u32 =
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS;

/// Polls the given stream. Breaks with `true` if there maybe is more work.
#[macro_export]
macro_rules! poll_nested_stream_with_budget {
($target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
let mut budget: u32 = $budget;

loop {
match $poll_stream {
Poll::Ready(Some(item)) => {
let mut f = $on_ready_some;
f(item);

budget = budget.saturating_sub(1);
if budget == 0 {
break true
}
}
Poll::Ready(None) => {
$($on_ready_none;)? // todo: handle error case with $target and $label
break false
}
Poll::Pending => break false,
}
}
}};
}

/// Metered poll of the given stream. Breaks with `true` if there maybe is more work.
#[macro_export]
macro_rules! metered_poll_nested_stream_with_budget {
($acc:ident, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
duration_metered_exec!(
{
$crate::poll_nested_stream_with_budget!($target, $label, $budget, $poll_stream, $on_ready_some $(, $on_ready_none;)?)
},
$acc
)
}};
}
30 changes: 22 additions & 8 deletions crates/net/network/src/eth_requests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Blocks/Headers management for the p2p network.

use crate::{metrics::EthRequestHandlerMetrics, peers::PeersHandle};
use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_STREAM, metrics::EthRequestHandlerMetrics, peers::PeersHandle,
poll_nested_stream_with_budget,
};
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, NodeData,
Expand Down Expand Up @@ -240,11 +243,13 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
match this.incoming_requests.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(incoming)) => match incoming {
let maybe_more_incoming_requests = poll_nested_stream_with_budget!(
"net::eth",
"Incoming eth requests stream",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
this.incoming_requests.poll_next_unpin(cx),
|incoming| {
match incoming {
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
this.on_headers_request(peer_id, request, response)
}
Expand All @@ -255,9 +260,18 @@ where
IncomingEthRequest::GetReceipts { peer_id, request, response } => {
this.on_receipts_request(peer_id, request, response)
}
},
}
}
},
);

// stream is fully drained and import futures pending
if maybe_more_incoming_requests {
// make sure we're woken up again
cx.waker().wake_by_ref();
return Poll::Pending
}

Poll::Pending
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
/// Common helpers for network testing.
pub mod test_utils;

mod budget;
mod builder;
mod cache;
pub mod config;
Expand Down
68 changes: 32 additions & 36 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the RLPx session.

use crate::{
budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
config::NetworkConfig,
discovery::Discovery,
error::{NetworkError, ServiceKind},
Expand All @@ -26,6 +27,7 @@ use crate::{
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage},
peers::{PeersHandle, PeersManager},
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
session::SessionManager,
state::NetworkState,
Expand Down Expand Up @@ -911,25 +913,7 @@ where
this.on_block_import_result(outcome);
}

// process incoming messages from a handle
let start_network_handle = Instant::now();
loop {
match this.from_handle_rx.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// This is only possible if the channel was deliberately closed since we
// always have an instance of
// `NetworkHandle`
error!("Network message channel closed.");
return Poll::Ready(())
}
Poll::Ready(Some(msg)) => this.on_handle_message(msg),
};
}

poll_durations.acc_network_handle = start_network_handle.elapsed();

// This loop drives the entire state of network and does a lot of work. Under heavy load
// These loops drive the entire state of network and does a lot of work. Under heavy load
// (many messages/events), data may arrive faster than it can be processed (incoming
// messages/requests -> events), and it is possible that more data has already arrived by
// the time an internal event is processed. Which could turn this loop into a busy loop.
Expand All @@ -947,28 +931,40 @@ where
// iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
// range of what's recommended as rule of thumb.
// <https://ryhl.io/blog/async-what-is-blocking/>
let mut budget = 10;

loop {
// advance the swarm
match this.swarm.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(event)) => this.on_swarm_event(event),
}

// ensure we still have enough budget for another iteration
budget -= 1;
if budget == 0 {
trace!(target: "net", budget=10, "exhausted network manager budget");
// make sure we're woken up again
cx.waker().wake_by_ref();
break
}
}
// process incoming messages from a handle (`TransactionsManager` has one)
//
// will only be closed if the channel was deliberately closed since we always have an
// instance of `NetworkHandle`
let start_network_handle = Instant::now();
let maybe_more_handle_messages = poll_nested_stream_with_budget!(
"net",
"Network message channel",
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
this.from_handle_rx.poll_next_unpin(cx),
|msg| this.on_handle_message(msg),
error!("Network channel closed");
);
poll_durations.acc_network_handle = start_network_handle.elapsed();

// process incoming messages from the network
let maybe_more_swarm_events = poll_nested_stream_with_budget!(
"net",
"Swarm events stream",
DEFAULT_BUDGET_TRY_DRAIN_SWARM,
this.swarm.poll_next_unpin(cx),
|event| this.on_swarm_event(event),
);
poll_durations.acc_swarm =
start_network_handle.elapsed() - poll_durations.acc_network_handle;

// all streams are fully drained and import futures pending
if maybe_more_handle_messages || maybe_more_swarm_events {
// make sure we're woken up again
cx.waker().wake_by_ref();
return Poll::Pending
}

this.update_poll_metrics(start, poll_durations);

Poll::Pending
Expand Down
Loading

0 comments on commit 422b8f8

Please sign in to comment.