Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: NetworkHandle get access to TransactionsHandle #6780

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/net/network/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use criterion::*;
use futures::StreamExt;
use pprof::criterion::{Output, PProfProfiler};
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvents};
use reth_network::{test_utils::Testnet, EthNetwork, NetworkEvents};
use reth_network_api::Peers;
use reth_primitives::U256;
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub use discovery::{Discovery, DiscoveryEvent};
pub use fetch::FetchClient;
pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use network::{NetworkEvents, NetworkHandle, NetworkProtocols};
pub use network::{EthNetwork, NetworkEvents, NetworkHandle, NetworkProtocols};
pub use peers::PeersConfig;
pub use session::{
ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent,
Expand Down
73 changes: 53 additions & 20 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
peers::PeersHandle, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, FetchClient,
peers::PeersHandle, protocol::RlpxSubProtocol, swarm::NetworkConnectionState,
transactions::TransactionsHandle, FetchClient,
};
use parking_lot::Mutex;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
Expand Down Expand Up @@ -63,6 +64,7 @@ impl NetworkHandle {
initial_sync_done: Arc::new(AtomicBool::new(false)),
chain_id,
tx_gossip_disabled,
tx_handle: Mutex::new(None),
qiweiii marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "optimism")]
sequencer_endpoint,
};
Expand Down Expand Up @@ -118,24 +120,6 @@ impl NetworkHandle {
self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
}

/// Sends a [`PeerRequest`] to the given peer's session.
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}

/// Send transactions hashes to the peer.
pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
}

/// Send full transactions to the peer
pub fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<TransactionSigned>>) {
self.send_message(NetworkHandleMessage::SendTransaction {
peer_id,
msg: SharedTransactions(msg),
})
}
qiweiii marked this conversation as resolved.
Show resolved Hide resolved

/// Provides a shareable reference to the [`BandwidthMeter`] stored on the `NetworkInner`.
pub fn bandwidth_meter(&self) -> &BandwidthMeter {
&self.inner.bandwidth_meter
Expand Down Expand Up @@ -322,7 +306,7 @@ impl SyncStateProvider for NetworkHandle {
// used to guard the txpool
fn is_initially_syncing(&self) -> bool {
if self.inner.initial_sync_done.load(Ordering::Relaxed) {
return false
return false;
qiweiii marked this conversation as resolved.
Show resolved Hide resolved
}
self.inner.is_syncing.load(Ordering::Relaxed)
}
Expand All @@ -344,6 +328,32 @@ impl NetworkSyncUpdater for NetworkHandle {
}
}

impl EthNetwork for NetworkHandle {
qiweiii marked this conversation as resolved.
Show resolved Hide resolved
fn set_tx_handle(&self, handle: TransactionsHandle) {
let mut tx_handle = self.inner.tx_handle.lock();
*tx_handle = Some(handle);
}

fn tx_handle(&self) -> Option<TransactionsHandle> {
self.inner.tx_handle.lock().clone()
qiweiii marked this conversation as resolved.
Show resolved Hide resolved
}

fn send_request(&self, peer_id: PeerId, request: PeerRequest) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}

fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
}

fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<TransactionSigned>>) {
self.send_message(NetworkHandleMessage::SendTransaction {
peer_id,
msg: SharedTransactions(msg),
})
}
}

#[derive(Debug)]
struct NetworkInner {
/// Number of active peer sessions the node's currently handling.
Expand All @@ -370,6 +380,8 @@ struct NetworkInner {
chain_id: Arc<AtomicU64>,
/// Whether to disable transaction gossip
tx_gossip_disabled: bool,
/// Access to the [`TransactionsHandle`].
tx_handle: Mutex<Option<TransactionsHandle>>,
/// The sequencer HTTP Endpoint
#[cfg(feature = "optimism")]
sequencer_endpoint: Option<String>,
Expand All @@ -391,6 +403,27 @@ pub trait NetworkProtocols: Send + Sync {
fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol);
}

/// Provides access to interact with the network's transactions.
pub trait EthNetwork: Send + Sync {
/// Sets the [`TransactionsHandle`] for the network.
fn set_tx_handle(&self, handle: TransactionsHandle);

/// Returns the [`TransactionsHandle`] of this network.
///
/// Only call this after [`TransactionsManager`](crate::transactions::TransactionsManager)
/// is created.
fn tx_handle(&self) -> Option<TransactionsHandle>;

/// Sends a [`PeerRequest`] to the given peer's session.
fn send_request(&self, peer_id: PeerId, request: PeerRequest);

/// Send transactions hashes to the peer.
fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes);

/// Send full transactions to the peer
fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<TransactionSigned>>);
}

/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
#[derive(Debug)]
pub(crate) enum NetworkHandleMessage {
Expand Down
4 changes: 3 additions & 1 deletion crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkEvents, NetworkHandle,
EthNetwork, NetworkEvents, NetworkHandle,
};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{
Expand Down Expand Up @@ -245,6 +245,8 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();

network.set_tx_handle(TransactionsHandle { manager_tx: command_tx.clone() });

let transaction_fetcher = TransactionFetcher::default().with_transaction_fetcher_config(
&transactions_manager_config.transaction_fetcher_config,
);
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/tests/it/big_pooled_txs_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use reth_eth_wire::{GetPooledTransactions, PooledTransactions};
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState};
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
NetworkEvents, PeerRequest,
EthNetwork, NetworkEvents, PeerRequest,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::{Signature, TransactionSigned, B256};
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/tests/it/txgossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use futures::StreamExt;
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEvents};
use reth_network::{test_utils::Testnet, EthNetwork, NetworkEvent, NetworkEvents};
use reth_network_api::PeersInfo;
use reth_primitives::{TransactionSigned, TxLegacy, U256};
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
Expand Down
Loading