Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate Node traits into LaunchContextWith #8993

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 194 additions & 38 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
//! Helper types that can be used by launchers.

use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};
use backon::{ConstantBuilder, Retryable};
use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_auto_seal_consensus::MiningMode;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig};
use reth_blockchain_tree::{
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
TreeExternals,
};
use reth_chainspec::{Chain, ChainSpec};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::Consensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitDatabaseError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_evm::noop::NoopBlockExecutorProvider;
use reth_network_p2p::headers::client::HeadersClient;
use reth_node_api::FullNodeTypes;
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
Expand All @@ -29,7 +39,7 @@ use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use std::{sync::Arc, thread::available_parallelism};
use std::{marker::PhantomData, sync::Arc, thread::available_parallelism};
use tokio::sync::{
mpsc::{unbounded_channel, Receiver, UnboundedSender},
oneshot, watch,
Expand Down Expand Up @@ -509,9 +519,12 @@ where
}

/// Creates a `BlockchainProvider` and attaches it to the launch context.
pub async fn with_blockchain_db(
pub async fn with_blockchain_db<T>(
self,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>> {
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>>
where
T: FullNodeTypes<Provider = BlockchainProvider<<T as FullNodeTypes>::DB>>,
{
let tree_config = BlockchainTreeConfig::default();

// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
Expand All @@ -526,11 +539,15 @@ where
)?;

let metered_providers = WithMeteredProviders {
provider_factory: self.provider_factory().clone(),
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
metrics_sender: self.sync_metrics_tx(),
tree_config,
canon_state_notification_sender,
// we store here a reference to T.
phantom_data: PhantomData,
};

let ctx = LaunchContextWith {
Expand All @@ -542,9 +559,10 @@ where
}
}

impl<DB> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>
impl<DB, T> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
{
/// Returns access to the underlying database.
pub fn database(&self) -> &DB {
Expand All @@ -553,20 +571,122 @@ where

/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().provider_factory
&self.right().db_provider_container.provider_factory
}

/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.
pub fn lookup_head(&self) -> eyre::Result<Head> {
self.node_config()
.lookup_head(self.provider_factory().clone())
.wrap_err("the head block is missing")
}

/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}

/// Returns a reference to the `BlockchainProvider`.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> {
&self.right().blockchain_db
}

/// Returns a reference to the `BlockchainTreeConfig`.
pub const fn tree_config(&self) -> &BlockchainTreeConfig {
&self.right().tree_config
}

/// Returns the `CanonStateNotificationSender`.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.right().canon_state_notification_sender.clone()
}

/// Creates a `NodeAdapter` and attaches it to the launch context.
pub async fn with_components<CB>(
self,
components_builder: CB,
on_component_initialized: Box<
dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
>,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>>
where
CB: NodeComponentsBuilder<T>,
{
// fetch the head block from the database
let head = self.lookup_head()?;

let builder_ctx = BuilderContext::new(
head,
self.blockchain_db().clone(),
self.task_executor().clone(),
self.configs().clone(),
);

debug!(target: "reth::cli", "creating components");
let components = components_builder.build_components(&builder_ctx).await?;

let consensus: Arc<dyn Consensus> = Arc::new(components.consensus().clone());

let tree_externals = TreeExternals::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
consensus.clone(),
components.block_executor().clone(),
);
let tree = BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())?
.with_sync_metrics_tx(self.sync_metrics_tx())
// Note: This is required because we need to ensure that both the components and the
// tree are using the same channel for canon state notifications. This will be removed
// once the Blockchain provider no longer depends on an instance of the tree
.with_canon_state_notification_sender(self.canon_state_notification_sender());

let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));

// Replace the tree component with the actual tree
let blockchain_db = self.blockchain_db().clone().with_tree(blockchain_tree);

debug!(target: "reth::cli", "configured blockchain tree");

let node_adapter = NodeAdapter {
components,
task_executor: self.task_executor().clone(),
provider: blockchain_db.clone(),
};

debug!(target: "reth::cli", "calling on_component_initialized hook");
on_component_initialized.on_event(node_adapter.clone())?;

let components_container = WithComponents {
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
tree_config: self.right().tree_config,
node_adapter,
head,
consensus,
};

let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| components_container),
};

Ok(ctx)
}
}

impl<DB, T, CB> LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
CB: NodeComponentsBuilder<T>,
{
/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().db_provider_container.provider_factory
}

/// Returns the max block that the node should run to, looking it up from the network if
Expand All @@ -578,33 +698,52 @@ where
self.node_config().max_block(client, self.provider_factory().clone()).await
}

/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.
pub fn lookup_head(&self) -> eyre::Result<Head> {
self.node_config()
.lookup_head(self.provider_factory().clone())
.wrap_err("the head block is missing")
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
}

/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().metrics_sender.clone()
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
}

/// Returns the current head block.
pub const fn head(&self) -> Head {
self.right().head
}

/// Returns the configured `NodeAdapter`.
pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
&self.right().node_adapter
}

/// Returns a reference to the `BlockchainProvider`.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> {
&self.right().blockchain_db
}

/// Returns the configured `Consensus`.
pub fn consensus(&self) -> Arc<dyn Consensus> {
self.right().consensus.clone()
}

/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}

/// Returns a reference to the `BlockchainTreeConfig`.
pub const fn tree_config(&self) -> &BlockchainTreeConfig {
&self.right().tree_config
}

/// Returns the `CanonStateNotificationSender`.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.right().canon_state_notification_sender.clone()
/// Returns the node adapter components.
pub const fn components(&self) -> &CB::Components {
&self.node_adapter().components
}
}

Expand Down Expand Up @@ -668,23 +807,40 @@ pub struct WithConfigs {
pub toml_config: reth_config::Config,
}

/// Helper container type to bundle the [`ProviderFactory`] and the metrics
/// sender.
#[derive(Debug, Clone)]
pub struct WithMeteredProvider<DB> {
provider_factory: ProviderFactory<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
}

/// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`]
/// and a metrics sender.
#[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<DB> {
provider_factory: ProviderFactory<DB>,
pub struct WithMeteredProviders<DB, T> {
db_provider_container: WithMeteredProvider<DB>,
blockchain_db: BlockchainProvider<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
canon_state_notification_sender: CanonStateNotificationSender,
tree_config: BlockchainTreeConfig,
// this field is used to store a reference to the FullNodeTypes so that we
// can build the components in `with_components` method.
phantom_data: PhantomData<T>,
}

/// Helper container type to bundle athe [`ProviderFactory`] and the metrics
/// sender.
#[derive(Debug)]
pub struct WithMeteredProvider<DB> {
provider_factory: ProviderFactory<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
/// Helper container to bundle the metered providers container and [`NodeAdapter`].
#[allow(missing_debug_implementations)]
pub struct WithComponents<DB, T, CB>
where
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
CB: NodeComponentsBuilder<T>,
{
db_provider_container: WithMeteredProvider<DB>,
tree_config: BlockchainTreeConfig,
blockchain_db: BlockchainProvider<DB>,
node_adapter: NodeAdapter<T, CB::Components>,
head: Head,
consensus: Arc<dyn Consensus>,
}

#[cfg(test)]
Expand Down
Loading
Loading