Skip to content

Commit

Permalink
fix(stages, etl): clear ETL collectors in Headers stage when done (#6964
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shekhirin committed Mar 5, 2024
1 parent ec401aa commit 024c217
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 60 deletions.
2 changes: 1 addition & 1 deletion bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
)?
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
)?
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
Expand Down
19 changes: 8 additions & 11 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,17 +399,14 @@ where
.build(client.clone(), consensus.clone(), provider_factory.clone())
.into_task();

Pipeline::builder().add_stages(
DefaultStages::new(
provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
)
.expect("should build"),
)
Pipeline::builder().add_stages(DefaultStages::new(
provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
))
}
};

Expand Down
63 changes: 50 additions & 13 deletions crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
use std::{
cmp::Reverse,
collections::BinaryHeap,
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::Path,
sync::Arc,
};

use rayon::prelude::*;
Expand All @@ -41,7 +40,7 @@ where
<V as Compress>::Compressed: std::fmt::Debug,
{
/// Directory for temporary file storage
dir: Arc<TempDir>,
dir: Option<TempDir>,
/// Collection of temporary ETL files
files: Vec<EtlFile>,
/// Current buffer size in bytes
Expand All @@ -61,12 +60,12 @@ where
<K as Encode>::Encoded: Ord + std::fmt::Debug,
<V as Compress>::Compressed: Ord + std::fmt::Debug,
{
/// Create a new collector in a specific temporary directory with some capacity.
/// Create a new collector with some capacity.
///
/// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk.
pub fn new(dir: Arc<TempDir>, buffer_capacity_bytes: usize) -> Self {
pub fn new(buffer_capacity_bytes: usize) -> Self {
Self {
dir,
dir: None,
buffer_size_bytes: 0,
files: Vec::new(),
buffer_capacity_bytes,
Expand All @@ -85,24 +84,49 @@ where
self.len == 0
}

/// Clears the collector, removing all data, including the temporary directory.
pub fn clear(&mut self) {
self.dir = None;
// Clear vectors and free the allocated memory
self.files = Vec::new();
self.buffer = Vec::new();
self.buffer_size_bytes = 0;
self.len = 0;
}

/// Insert an entry into the collector.
pub fn insert(&mut self, key: K, value: V) {
pub fn insert(&mut self, key: K, value: V) -> io::Result<()> {
let key = key.encode();
let value = value.compress();
self.buffer_size_bytes += key.as_ref().len() + value.as_ref().len();
self.buffer.push((key, value));
if self.buffer_size_bytes > self.buffer_capacity_bytes {
self.flush();
self.flush()?;
}
self.len += 1;

Ok(())
}

/// Returns a reference to the temporary directory used by the collector. If the directory
/// doesn't exist, it will be created.
fn dir(&mut self) -> io::Result<&TempDir> {
if self.dir.is_none() {
self.dir = Some(TempDir::new()?);
}
Ok(self.dir.as_ref().unwrap())
}

fn flush(&mut self) {
fn flush(&mut self) -> io::Result<()> {
self.buffer_size_bytes = 0;
self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut buf = Vec::with_capacity(self.buffer.len());
std::mem::swap(&mut buf, &mut self.buffer);
self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk"))

let path = self.dir()?.path().to_path_buf();
self.files.push(EtlFile::new(path.as_path(), buf)?);

Ok(())
}

/// Returns an iterator over the collector data.
Expand All @@ -116,7 +140,7 @@ where
pub fn iter(&mut self) -> std::io::Result<EtlIter<'_>> {
// Flush the remaining items to disk
if self.buffer_size_bytes > 0 {
self.flush();
self.flush()?;
}

let mut heap = BinaryHeap::new();
Expand Down Expand Up @@ -246,9 +270,11 @@ mod tests {
let mut entries: Vec<_> =
(0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();

let mut collector = Collector::new(Arc::new(TempDir::new().unwrap()), 1024);
let mut collector = Collector::new(1024);
assert!(collector.dir.is_none());

for (k, v) in entries.clone() {
collector.insert(k, v);
collector.insert(k, v).unwrap();
}
entries.sort_unstable_by_key(|entry| entry.0);

Expand All @@ -259,5 +285,16 @@ mod tests {
(expected.0.encode().to_vec(), expected.1.compress().to_vec())
);
}

let temp_dir_path = collector.dir.as_ref().unwrap().path().to_path_buf();

collector.clear();
assert!(collector.dir.is_none());
assert!(collector.files.is_empty());
assert_eq!(collector.buffer_size_bytes, 0);
assert!(collector.buffer.is_empty());
assert_eq!(collector.len, 0);
assert!(collector.is_empty());
assert!(!temp_dir_path.exists());
}
}
2 changes: 1 addition & 1 deletion crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ impl NodeConfig {
header_downloader,
body_downloader,
factory.clone(),
)?
)
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
})
Expand Down
1 change: 0 additions & 1 deletion crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
//! bodies_downloader,
//! executor_factory,
//! )
//! .unwrap(),
//! )
//! .build(provider_factory, static_file_producer);
//! ```
Expand Down
25 changes: 6 additions & 19 deletions crates/stages/src/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
StageError, StageSet, StageSetBuilder,
StageSet, StageSetBuilder,
};
use reth_db::database::Database;
use reth_interfaces::{
Expand All @@ -61,7 +61,6 @@ use reth_interfaces::{
};
use reth_provider::{ExecutorFactory, HeaderSyncGapProvider, HeaderSyncMode};
use std::sync::Arc;
use tempfile::TempDir;

/// A set containing all stages to run a fully syncing instance of reth.
///
Expand Down Expand Up @@ -101,21 +100,20 @@ impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
header_downloader: H,
body_downloader: B,
executor_factory: EF,
) -> Result<Self, StageError>
) -> Self
where
EF: ExecutorFactory,
{
Ok(Self {
Self {
online: OnlineStages::new(
provider,
header_mode,
consensus,
header_downloader,
body_downloader,
Arc::new(TempDir::new()?),
),
executor_factory,
})
}
}
}

Expand Down Expand Up @@ -164,8 +162,6 @@ pub struct OnlineStages<Provider, H, B> {
header_downloader: H,
/// The block body downloader
body_downloader: B,
/// Temporary directory for ETL usage on headers stage.
temp_dir: Arc<TempDir>,
}

impl<Provider, H, B> OnlineStages<Provider, H, B> {
Expand All @@ -176,9 +172,8 @@ impl<Provider, H, B> OnlineStages<Provider, H, B> {
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
temp_dir: Arc<TempDir>,
) -> Self {
Self { provider, header_mode, consensus, header_downloader, body_downloader, temp_dir }
Self { provider, header_mode, consensus, header_downloader, body_downloader }
}
}

Expand All @@ -203,16 +198,9 @@ where
mode: HeaderSyncMode,
header_downloader: H,
consensus: Arc<dyn Consensus>,
temp_dir: Arc<TempDir>,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(
provider,
header_downloader,
mode,
consensus.clone(),
temp_dir.clone(),
))
.add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone()))
.add_stage(bodies)
}
}
Expand All @@ -231,7 +219,6 @@ where
self.header_downloader,
self.header_mode,
self.consensus.clone(),
self.temp_dir.clone(),
))
.add_stage(BodyStage::new(self.body_downloader))
}
Expand Down
19 changes: 11 additions & 8 deletions crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tempfile::TempDir;
use tracing::*;

/// The headers stage.
Expand Down Expand Up @@ -77,16 +76,15 @@ where
downloader: Downloader,
mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
tempdir: Arc<TempDir>,
) -> Self {
Self {
provider: database,
downloader,
mode,
consensus,
sync_gap: None,
hash_collector: Collector::new(tempdir.clone(), 100 * (1024 * 1024)),
header_collector: Collector::new(tempdir, 100 * (1024 * 1024)),
hash_collector: Collector::new(100 * (1024 * 1024)),
header_collector: Collector::new(100 * (1024 * 1024)),
is_etl_ready: false,
}
}
Expand Down Expand Up @@ -158,7 +156,7 @@ where
// add it to the collector and use tx.append on all hashes.
if let Some((hash, block_number)) = cursor_header_numbers.last()? {
if block_number.value()? == 0 {
self.hash_collector.insert(hash.key()?, 0);
self.hash_collector.insert(hash.key()?, 0)?;
cursor_header_numbers.delete_current()?;
first_sync = true;
}
Expand Down Expand Up @@ -244,8 +242,8 @@ where
for header in headers {
let header_number = header.number;

self.hash_collector.insert(header.hash(), header_number);
self.header_collector.insert(header_number, header);
self.hash_collector.insert(header.hash(), header_number)?;
self.header_collector.insert(header_number, header)?;

// Headers are downloaded in reverse, so if we reach here, we know we have
// filled the gap.
Expand Down Expand Up @@ -291,6 +289,10 @@ where
let last_header_number =
self.write_headers::<DB>(provider.tx_ref(), provider.static_file_provider().clone())?;

// Clear ETL collectors
self.hash_collector.clear();
self.header_collector.clear();

Ok(ExecOutput {
checkpoint: StageCheckpoint::new(last_header_number).with_headers_stage_checkpoint(
HeadersCheckpoint {
Expand Down Expand Up @@ -420,7 +422,6 @@ mod tests {
(*self.downloader_factory)(),
HeaderSyncMode::Tip(self.channel.1.clone()),
self.consensus.clone(),
Arc::new(TempDir::new().unwrap()),
)
}
}
Expand Down Expand Up @@ -586,5 +587,7 @@ mod tests {
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number
);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
assert!(runner.stage().hash_collector.is_empty());
assert!(runner.stage().header_collector.is_empty());
}
}
7 changes: 2 additions & 5 deletions crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader,
TransactionsProvider, TransactionsProviderExt,
};
use std::sync::Arc;
use tempfile::TempDir;
use tracing::*;

/// The transaction lookup stage.
Expand Down Expand Up @@ -101,8 +99,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}

// 500MB temporary files
let mut hash_collector: Collector<TxHash, TxNumber> =
Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024));
let mut hash_collector: Collector<TxHash, TxNumber> = Collector::new(500 * (1024 * 1024));

debug!(
target: "sync::stages::transaction_lookup",
Expand All @@ -119,7 +116,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");

for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
hash_collector.insert(key, value);
hash_collector.insert(key, value)?;
}

input.checkpoint = Some(
Expand Down

0 comments on commit 024c217

Please sign in to comment.