Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node-core, storage): static files segment metrics #6908

Merged
merged 5 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions bin/reth/src/commands/db/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use reth_db::{database::Database, mdbx, static_file::iter_static_files, Database
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_primitives::static_file::{find_fixed_range, SegmentRangeInclusive};
use reth_provider::providers::StaticFileProvider;
use std::fs::File;

#[derive(Parser, Debug)]
/// The arguments for the `reth db stats` command
Expand Down Expand Up @@ -174,20 +173,17 @@ impl Command {

let columns = jar_provider.columns();
let rows = jar_provider.rows();
let data_size = File::open(jar_provider.data_path())
.and_then(|file| file.metadata())

let data_size = reth_primitives::fs::metadata(jar_provider.data_path())
.map(|metadata| metadata.len())
.unwrap_or_default();
let index_size = File::open(jar_provider.index_path())
.and_then(|file| file.metadata())
let index_size = reth_primitives::fs::metadata(jar_provider.index_path())
.map(|metadata| metadata.len())
.unwrap_or_default();
let offsets_size = File::open(jar_provider.offsets_path())
.and_then(|file| file.metadata())
let offsets_size = reth_primitives::fs::metadata(jar_provider.offsets_path())
.map(|metadata| metadata.len())
.unwrap_or_default();
let config_size = File::open(jar_provider.config_path())
.and_then(|file| file.metadata())
let config_size = reth_primitives::fs::metadata(jar_provider.config_path())
.map(|metadata| metadata.len())
.unwrap_or_default();

Expand Down
1 change: 1 addition & 0 deletions bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Command {
listen_addr,
prometheus_exporter::install_recorder()?,
Arc::clone(&db),
factory.static_file_provider(),
metrics_process::Collector::default(),
)
.await?;
Expand Down
15 changes: 10 additions & 5 deletions crates/node-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,17 +395,22 @@ where
// Does not do anything on windows.
fdlimit::raise_fd_limit()?;

let prometheus_handle = config.install_prometheus_recorder()?;
config.start_metrics_endpoint(prometheus_handle, database.clone()).await?;

info!(target: "reth::cli", "Database opened");

let provider_factory = ProviderFactory::new(
database.clone(),
Arc::clone(&config.chain),
data_dir.static_files_path(),
)?
.with_static_files_metrics();
info!(target: "reth::cli", "Database opened");

let prometheus_handle = config.install_prometheus_recorder()?;
config
.start_metrics_endpoint(
prometheus_handle,
database.clone(),
provider_factory.static_file_provider(),
)
.await?;

debug!(target: "reth::cli", chain=%config.chain.chain, genesis=?config.chain.genesis_hash(), "Initializing genesis");

Expand Down
16 changes: 16 additions & 0 deletions crates/node-core/src/metrics/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use reth_db::database_metrics::DatabaseMetrics;
use reth_metrics::metrics::Unit;
use reth_provider::providers::StaticFileProvider;
use std::{convert::Infallible, net::SocketAddr, sync::Arc};

pub(crate) trait Hook: Fn() + Send + Sync {}
Expand Down Expand Up @@ -79,17 +80,24 @@ pub async fn serve<Metrics>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
db: Metrics,
static_file_provider: StaticFileProvider,
process: metrics_process::Collector,
) -> eyre::Result<()>
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
{
let db_metrics_hook = move || db.report_metrics();
let static_file_metrics_hook = move || {
let _ = static_file_provider.report_metrics().map_err(
|error| tracing::error!(%error, "Failed to report static file provider metrics"),
);
};

// Clone `process` to move it into the hook and use the original `process` for describe below.
let cloned_process = process.clone();
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(db_metrics_hook),
Box::new(static_file_metrics_hook),
Box::new(move || cloned_process.collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
Expand All @@ -106,6 +114,14 @@ where
"db.timed_out_not_aborted_transactions",
"Number of timed out transactions that were not aborted by the user yet"
);

describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
describe_gauge!(
"static_files.segment_entries",
"The number of entries for a static file segment"
);

process.describe();
describe_memory_stats();
describe_io_stats();
Expand Down
9 changes: 6 additions & 3 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ use reth_primitives::{
BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, TxHash, B256, MAINNET,
};
use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockNumReader, BlockReader,
BlockchainTreePendingStateProvider, CanonStateSubscriptions, HeaderProvider, HeaderSyncMode,
ProviderFactory, StageCheckpointReader,
providers::{BlockchainProvider, StaticFileProvider},
BlockHashReader, BlockNumReader, BlockReader, BlockchainTreePendingStateProvider,
CanonStateSubscriptions, HeaderProvider, HeaderSyncMode, ProviderFactory,
StageCheckpointReader,
};
use reth_revm::EvmProcessorFactory;
use reth_stages::{
Expand Down Expand Up @@ -599,6 +600,7 @@ impl NodeConfig {
&self,
prometheus_handle: PrometheusHandle,
db: Metrics,
static_file_provider: StaticFileProvider,
) -> eyre::Result<()>
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
Expand All @@ -609,6 +611,7 @@ impl NodeConfig {
listen_addr,
prometheus_handle,
db,
static_file_provider,
metrics_process::Collector::default(),
)
.await?;
Expand Down
20 changes: 20 additions & 0 deletions crates/primitives/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ pub enum FsPathError {
/// The path related to the operation.
path: PathBuf,
},

/// Error variant for failed file metadata operation with additional path context.
#[error("failed to get metadata for {path:?}: {source}")]
Metadata {
/// The source `io::Error`.
source: io::Error,
/// The path related to the operation.
path: PathBuf,
},
}

impl FsPathError {
Expand Down Expand Up @@ -171,6 +180,11 @@ impl FsPathError {
pub fn rename(source: io::Error, from: impl Into<PathBuf>, to: impl Into<PathBuf>) -> Self {
FsPathError::Rename { source, from: from.into(), to: to.into() }
}

/// Returns the complementary error variant for [`std::fs::File::metadata`].
pub fn metadata(source: io::Error, path: impl Into<PathBuf>) -> Self {
FsPathError::Metadata { source, path: path.into() }
}
}

type Result<T> = std::result::Result<T, FsPathError>;
Expand Down Expand Up @@ -225,3 +239,9 @@ pub fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<()> {
let to = to.as_ref();
fs::rename(from, to).map_err(|err| FsPathError::rename(err, from, to))
}

/// Wrapper for `std::fs::metadata`
pub fn metadata(path: impl AsRef<Path>) -> Result<fs::Metadata> {
let path = path.as_ref();
fs::metadata(path).map_err(|err| FsPathError::metadata(err, path))
}
39 changes: 39 additions & 0 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,45 @@ impl StaticFileProvider {
Self(Arc::new(provider))
}

/// Reports metrics for the static files.
pub fn report_metrics(&self) -> ProviderResult<()> {
let Some(metrics) = &self.metrics else { return Ok(()) };

let static_files = iter_static_files(&self.path)?;
for (segment, ranges) in static_files {
let mut entries = 0;
let mut size = 0;

for (block_range, _) in &ranges {
let fixed_block_range = find_fixed_range(block_range.start());
let jar_provider = self
.get_segment_provider(segment, || Some(fixed_block_range), None)?
.ok_or(ProviderError::MissingStaticFileBlock(segment, block_range.start()))?;

entries += jar_provider.rows();

let data_size = reth_primitives::fs::metadata(jar_provider.data_path())
.map(|metadata| metadata.len())
.unwrap_or_default();
let index_size = reth_primitives::fs::metadata(jar_provider.index_path())
.map(|metadata| metadata.len())
.unwrap_or_default();
let offsets_size = reth_primitives::fs::metadata(jar_provider.offsets_path())
.map(|metadata| metadata.len())
.unwrap_or_default();
let config_size = reth_primitives::fs::metadata(jar_provider.config_path())
.map(|metadata| metadata.len())
.unwrap_or_default();

size += data_size + index_size + offsets_size + config_size;
}

metrics.record_segment(segment, size, ranges.len(), entries);
}

Ok(())
}

/// Gets the [`StaticFileJarProvider`] of the requested segment and block.
pub fn get_segment_provider_from_block(
&self,
Expand Down
39 changes: 38 additions & 1 deletion crates/storage/provider/src/providers/static_file/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{collections::HashMap, time::Duration};

use itertools::Itertools;
use metrics::{Counter, Histogram};
use metrics::{Counter, Gauge, Histogram};
use reth_metrics::Metrics;
use reth_primitives::StaticFileSegment;
use strum::{EnumIter, IntoEnumIterator};

/// Metrics for the static file provider.
#[derive(Debug)]
pub struct StaticFileProviderMetrics {
segments: HashMap<StaticFileSegment, StaticFileSegmentMetrics>,
segment_operations: HashMap<
(StaticFileSegment, StaticFileProviderOperation),
StaticFileProviderOperationMetrics,
Expand All @@ -18,6 +19,14 @@ pub struct StaticFileProviderMetrics {
impl Default for StaticFileProviderMetrics {
fn default() -> Self {
Self {
segments: StaticFileSegment::iter()
.map(|segment| {
(
segment,
StaticFileSegmentMetrics::new_with_labels(&[("segment", segment.as_str())]),
)
})
.collect(),
segment_operations: StaticFileSegment::iter()
.cartesian_product(StaticFileProviderOperation::iter())
.map(|(segment, operation)| {
Expand All @@ -35,6 +44,22 @@ impl Default for StaticFileProviderMetrics {
}

impl StaticFileProviderMetrics {
pub(crate) fn record_segment(
&self,
segment: StaticFileSegment,
size: u64,
files: usize,
entries: usize,
) {
self.segments.get(&segment).expect("segment metrics should exist").size.set(size as f64);
self.segments.get(&segment).expect("segment metrics should exist").files.set(files as f64);
self.segments
.get(&segment)
.expect("segment metrics should exist")
.entries
.set(entries as f64);
}

pub(crate) fn record_segment_operation(
&self,
segment: StaticFileSegment,
Expand Down Expand Up @@ -80,6 +105,18 @@ impl StaticFileProviderOperation {
}
}

/// Metrics for a specific static file segment.
#[derive(Metrics)]
#[metrics(scope = "static_files.segment")]
pub(crate) struct StaticFileSegmentMetrics {
/// The size of a static file segment
size: Gauge,
/// The number of files for a static file segment
files: Gauge,
/// The number of entries for a static file segment
entries: Gauge,
}

#[derive(Metrics)]
#[metrics(scope = "static_files.jar_provider")]
pub(crate) struct StaticFileProviderOperationMetrics {
Expand Down
Loading
Loading