Skip to content

Commit

Permalink
feat: integrate Node traits into LaunchContextWith (#8993)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed Jun 24, 2024
1 parent bd0f676 commit 08b1e88
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 117 deletions.
232 changes: 194 additions & 38 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
//! Helper types that can be used by launchers.

use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};
use backon::{ConstantBuilder, Retryable};
use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_auto_seal_consensus::MiningMode;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig};
use reth_blockchain_tree::{
noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
TreeExternals,
};
use reth_chainspec::{Chain, ChainSpec};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::Consensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitDatabaseError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_evm::noop::NoopBlockExecutorProvider;
use reth_network_p2p::headers::client::HeadersClient;
use reth_node_api::FullNodeTypes;
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
Expand All @@ -29,7 +39,7 @@ use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use std::{sync::Arc, thread::available_parallelism};
use std::{marker::PhantomData, sync::Arc, thread::available_parallelism};
use tokio::sync::{
mpsc::{unbounded_channel, Receiver, UnboundedSender},
oneshot, watch,
Expand Down Expand Up @@ -509,9 +519,12 @@ where
}

/// Creates a `BlockchainProvider` and attaches it to the launch context.
pub async fn with_blockchain_db(
pub async fn with_blockchain_db<T>(
self,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>> {
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>>
where
T: FullNodeTypes<Provider = BlockchainProvider<<T as FullNodeTypes>::DB>>,
{
let tree_config = BlockchainTreeConfig::default();

// NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
Expand All @@ -526,11 +539,15 @@ where
)?;

let metered_providers = WithMeteredProviders {
provider_factory: self.provider_factory().clone(),
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
metrics_sender: self.sync_metrics_tx(),
tree_config,
canon_state_notification_sender,
// we store here a reference to T.
phantom_data: PhantomData,
};

let ctx = LaunchContextWith {
Expand All @@ -542,9 +559,10 @@ where
}
}

impl<DB> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB>>>
impl<DB, T> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders<DB, T>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
{
/// Returns access to the underlying database.
pub fn database(&self) -> &DB {
Expand All @@ -553,20 +571,122 @@ where

/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().provider_factory
&self.right().db_provider_container.provider_factory
}

/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.
pub fn lookup_head(&self) -> eyre::Result<Head> {
self.node_config()
.lookup_head(self.provider_factory().clone())
.wrap_err("the head block is missing")
}

/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}

/// Returns a reference to the `BlockchainProvider`.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> {
&self.right().blockchain_db
}

/// Returns a reference to the `BlockchainTreeConfig`.
pub const fn tree_config(&self) -> &BlockchainTreeConfig {
&self.right().tree_config
}

/// Returns the `CanonStateNotificationSender`.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.right().canon_state_notification_sender.clone()
}

/// Creates a `NodeAdapter` and attaches it to the launch context.
pub async fn with_components<CB>(
self,
components_builder: CB,
on_component_initialized: Box<
dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
>,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>>
where
CB: NodeComponentsBuilder<T>,
{
// fetch the head block from the database
let head = self.lookup_head()?;

let builder_ctx = BuilderContext::new(
head,
self.blockchain_db().clone(),
self.task_executor().clone(),
self.configs().clone(),
);

debug!(target: "reth::cli", "creating components");
let components = components_builder.build_components(&builder_ctx).await?;

let consensus: Arc<dyn Consensus> = Arc::new(components.consensus().clone());

let tree_externals = TreeExternals::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
consensus.clone(),
components.block_executor().clone(),
);
let tree = BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())?
.with_sync_metrics_tx(self.sync_metrics_tx())
// Note: This is required because we need to ensure that both the components and the
// tree are using the same channel for canon state notifications. This will be removed
// once the Blockchain provider no longer depends on an instance of the tree
.with_canon_state_notification_sender(self.canon_state_notification_sender());

let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));

// Replace the tree component with the actual tree
let blockchain_db = self.blockchain_db().clone().with_tree(blockchain_tree);

debug!(target: "reth::cli", "configured blockchain tree");

let node_adapter = NodeAdapter {
components,
task_executor: self.task_executor().clone(),
provider: blockchain_db.clone(),
};

debug!(target: "reth::cli", "calling on_component_initialized hook");
on_component_initialized.on_event(node_adapter.clone())?;

let components_container = WithComponents {
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
tree_config: self.right().tree_config,
node_adapter,
head,
consensus,
};

let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| components_container),
};

Ok(ctx)
}
}

impl<DB, T, CB> LaunchContextWith<Attached<WithConfigs, WithComponents<DB, T, CB>>>
where
DB: Database + DatabaseMetrics + Send + Sync + Clone + 'static,
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
CB: NodeComponentsBuilder<T>,
{
/// Returns the configured `ProviderFactory`.
pub const fn provider_factory(&self) -> &ProviderFactory<DB> {
&self.right().db_provider_container.provider_factory
}

/// Returns the max block that the node should run to, looking it up from the network if
Expand All @@ -578,33 +698,52 @@ where
self.node_config().max_block(client, self.provider_factory().clone()).await
}

/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.
pub fn lookup_head(&self) -> eyre::Result<Head> {
self.node_config()
.lookup_head(self.provider_factory().clone())
.wrap_err("the head block is missing")
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
self.provider_factory().static_file_provider()
}

/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().metrics_sender.clone()
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.prune_modes().unwrap_or_default(),
)
}

/// Returns the current head block.
pub const fn head(&self) -> Head {
self.right().head
}

/// Returns the configured `NodeAdapter`.
pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
&self.right().node_adapter
}

/// Returns a reference to the `BlockchainProvider`.
pub const fn blockchain_db(&self) -> &BlockchainProvider<DB> {
&self.right().blockchain_db
}

/// Returns the configured `Consensus`.
pub fn consensus(&self) -> Arc<dyn Consensus> {
self.right().consensus.clone()
}

/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}

/// Returns a reference to the `BlockchainTreeConfig`.
pub const fn tree_config(&self) -> &BlockchainTreeConfig {
&self.right().tree_config
}

/// Returns the `CanonStateNotificationSender`.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.right().canon_state_notification_sender.clone()
/// Returns the node adapter components.
pub const fn components(&self) -> &CB::Components {
&self.node_adapter().components
}
}

Expand Down Expand Up @@ -668,23 +807,40 @@ pub struct WithConfigs {
pub toml_config: reth_config::Config,
}

/// Helper container type to bundle the [`ProviderFactory`] and the metrics
/// sender.
#[derive(Debug, Clone)]
pub struct WithMeteredProvider<DB> {
provider_factory: ProviderFactory<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
}

/// Helper container to bundle the [`ProviderFactory`], [`BlockchainProvider`]
/// and a metrics sender.
#[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<DB> {
provider_factory: ProviderFactory<DB>,
pub struct WithMeteredProviders<DB, T> {
db_provider_container: WithMeteredProvider<DB>,
blockchain_db: BlockchainProvider<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
canon_state_notification_sender: CanonStateNotificationSender,
tree_config: BlockchainTreeConfig,
// this field is used to store a reference to the FullNodeTypes so that we
// can build the components in `with_components` method.
phantom_data: PhantomData<T>,
}

/// Helper container type to bundle athe [`ProviderFactory`] and the metrics
/// sender.
#[derive(Debug)]
pub struct WithMeteredProvider<DB> {
provider_factory: ProviderFactory<DB>,
metrics_sender: UnboundedSender<MetricEvent>,
/// Helper container to bundle the metered providers container and [`NodeAdapter`].
#[allow(missing_debug_implementations)]
pub struct WithComponents<DB, T, CB>
where
T: FullNodeTypes<Provider = BlockchainProvider<DB>>,
CB: NodeComponentsBuilder<T>,
{
db_provider_container: WithMeteredProvider<DB>,
tree_config: BlockchainTreeConfig,
blockchain_db: BlockchainProvider<DB>,
node_adapter: NodeAdapter<T, CB::Components>,
head: Head,
consensus: Arc<dyn Consensus>,
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 08b1e88

Please sign in to comment.