Skip to content

Commit

Permalink
Tx fetcher metrics (#6951)
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Mar 4, 2024
1 parent 0d3b77f commit ebe72f7
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 87 deletions.
62 changes: 41 additions & 21 deletions crates/net/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ pub struct NetworkMetrics {
/* -- Poll duration of items nested in `NetworkManager` future -- */
/// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which
/// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in
/// one call to poll the [`NetworkManager`](crate::NetworkManager) future.
/// one call to poll the [`NetworkManager`](crate::NetworkManager) future. At least
/// [`TransactionsManager`](crate::transactions::TransactionsManager) holds this handle.
///
/// Duration in seconds.
// todo: find out how many components hold the network handle.
pub(crate) duration_poll_network_handle: Gauge,
/// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the
/// [`NetworkManager`](crate::NetworkManager) future.
Expand Down Expand Up @@ -119,22 +119,6 @@ pub struct TransactionsManagerMetrics {
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_pending_pool_imports: Counter,
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
/* ================ TX FETCHER ================ */
pub(crate) inflight_transaction_requests: Gauge,
/// Number of inflight requests at which the
/// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_inflight_requests: Counter,
/// Hashes in currently active outgoing
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
pub(crate) hashes_inflight_transaction_requests: Gauge,
/// How often we failed to send a request to the peer because the channel was full.
pub(crate) egress_peer_channel_full: Counter,
/// Total number of hashes pending fetch.
pub(crate) hashes_pending_fetch: Gauge,

/* ================ POLL DURATION ================ */

Expand Down Expand Up @@ -191,17 +175,53 @@ pub struct TransactionsManagerMetrics {
pub(crate) acc_duration_poll_commands: Gauge,
}

/// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager).
#[derive(Metrics)]
#[metrics(scope = "network")]
pub struct TransactionFetcherMetrics {
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
pub(crate) inflight_transaction_requests: Gauge,
/// Number of inflight requests at which the
/// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_inflight_requests: Counter,
/// Hashes in currently active outgoing
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
pub(crate) hashes_inflight_transaction_requests: Gauge,
/// How often we failed to send a request to the peer because the channel was full.
pub(crate) egress_peer_channel_full: Counter,
/// Total number of hashes pending fetch.
pub(crate) hashes_pending_fetch: Gauge,

/* ================ SEARCH DURATION ================ */
/// Time spent searching for an idle peer in call to
/// [`TransactionFetcher::find_any_idle_fallback_peer_for_any_pending_hash`](crate::transactions::TransactionFetcher::find_any_idle_fallback_peer_for_any_pending_hash).
///
/// Duration in seconds.
pub(crate) duration_find_idle_fallback_peer_for_any_pending_hash: Gauge,

/// Time spent searching for hashes pending fetch, announced by a given peer in
/// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`](crate::transactions::TransactionFetcher::fill_request_from_hashes_pending_fetch).
///
/// Duration in seconds.
pub(crate) duration_fill_request_from_hashes_pending_fetch: Gauge,
}

/// Measures the duration of executing the given code block. The duration is added to the given
/// accumulator value passed as a mutable reference.
#[macro_export]
macro_rules! duration_metered_exec {
($code:block, $acc:ident) => {
($code:expr, $acc:ident) => {{
let start = Instant::now();

$code;
let res = $code;

*$acc += start.elapsed();
};

res
}};
}

/// Metrics for Disconnection types
Expand Down
109 changes: 84 additions & 25 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
message::PeerRequest,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
};
use derive_more::{Constructor, Deref};
Expand All @@ -48,6 +50,7 @@ use std::{
num::NonZeroUsize,
pin::Pin,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};
Expand Down Expand Up @@ -87,18 +90,52 @@ pub struct TransactionFetcher {
pub(super) filter_valid_message: MessageFilter,
/// Info on capacity of the transaction fetcher.
pub info: TransactionFetcherInfo,
#[doc(hidden)]
metrics: TransactionFetcherMetrics,
}

// === impl TransactionFetcher ===

impl TransactionFetcher {
/// Updates metrics.
#[inline]
pub fn update_metrics(&self) {
let metrics = &self.metrics;

metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);

let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;

metrics.hashes_pending_fetch.set(hashes_pending_fetch);
metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
}

#[inline]
fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
let metrics = &self.metrics;

let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
metrics
.duration_find_idle_fallback_peer_for_any_pending_hash
.set(find_idle_peer.as_secs_f64());
metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
}

/// Sets up transaction fetcher with config
pub fn with_transaction_fetcher_config(mut self, config: &TransactionFetcherConfig) -> Self {
self.info.soft_limit_byte_size_pooled_transactions_response =
pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
let mut tx_fetcher = TransactionFetcher::default();

tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response =
config.soft_limit_byte_size_pooled_transactions_response;
self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
config.soft_limit_byte_size_pooled_transactions_response_on_pack_request;
self
tx_fetcher
.metrics
.capacity_inflight_requests
.increment(tx_fetcher.info.max_inflight_requests as u64);

tx_fetcher
}

/// Removes the specified hashes from inflight tracking.
Expand Down Expand Up @@ -384,24 +421,34 @@ impl TransactionFetcher {
&mut self,
peers: &HashMap<PeerId, PeerMetadata>,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) {
let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);

let mut search_durations = TxFetcherSearchDurations::default();

// budget to look for an idle peer before giving up
let budget_find_idle_fallback_peer = self
.search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);

let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
&mut hashes_to_request,
is_session_active,
budget_find_idle_fallback_peer,
) else {
// no peers are idle or budget is depleted
return
};
let acc = &mut search_durations.fill_request;
let peer_id = duration_metered_exec!(
{
let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
&mut hashes_to_request,
is_session_active,
budget_find_idle_fallback_peer,
) else {
// no peers are idle or budget is depleted
return
};

peer_id
},
acc
);

// peer should always exist since `is_session_active` already checked
let Some(peer) = peers.get(&peer_id) else { return };
let conn_eth_version = peer.version;
Expand All @@ -415,15 +462,23 @@ impl TransactionFetcher {
&has_capacity_wrt_pending_pool_imports,
);

self.fill_request_from_hashes_pending_fetch(
&mut hashes_to_request,
&peer.seen_transactions,
budget_fill_request,
let acc = &mut search_durations.find_idle_peer;
duration_metered_exec!(
{
self.fill_request_from_hashes_pending_fetch(
&mut hashes_to_request,
&peer.seen_transactions,
budget_fill_request,
)
},
acc
);

// free unused memory
hashes_to_request.shrink_to_fit();

self.update_pending_fetch_cache_search_metrics(search_durations);

trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes_to_request,
Expand All @@ -432,11 +487,9 @@ impl TransactionFetcher {
);

// request the buffered missing transactions
if let Some(failed_to_request_hashes) = self.request_transactions_from_peer(
hashes_to_request,
peer,
metrics_increment_egress_peer_channel_full,
) {
if let Some(failed_to_request_hashes) =
self.request_transactions_from_peer(hashes_to_request, peer)
{
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?failed_to_request_hashes,
Expand Down Expand Up @@ -575,7 +628,6 @@ impl TransactionFetcher {
&mut self,
new_announced_hashes: RequestTxHashes,
peer: &PeerMetadata,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) -> Option<RequestTxHashes> {
let peer_id: PeerId = peer.request_tx.peer_id;
let conn_eth_version = peer.version;
Expand Down Expand Up @@ -642,7 +694,7 @@ impl TransactionFetcher {
// peer channel is full
match err {
TrySendError::Full(_) | TrySendError::Closed(_) => {
metrics_increment_egress_peer_channel_full();
self.metrics.egress_peer_channel_full.increment(1);
return Some(new_announced_hashes)
}
}
Expand Down Expand Up @@ -1013,6 +1065,7 @@ impl Default for TransactionFetcher {
),
filter_valid_message: Default::default(),
info: TransactionFetcherInfo::default(),
metrics: Default::default(),
}
}
}
Expand Down Expand Up @@ -1259,6 +1312,12 @@ impl Default for TransactionFetcherInfo {
}
}

#[derive(Debug, Default)]
struct TxFetcherSearchDurations {
find_idle_peer: Duration,
fill_request: Duration,
}

#[cfg(test)]
mod test {
use std::{collections::HashSet, str::FromStr};
Expand Down Expand Up @@ -1425,7 +1484,7 @@ mod test {

// TEST

tx_fetcher.on_fetch_pending_hashes(&peers, |_| true, || ());
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);

// mock session of peer_1 receives request
let req = peer_1_mock_session_rx
Expand Down
Loading

0 comments on commit ebe72f7

Please sign in to comment.