Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node-core, storage): static files segment metrics #6908

Merged
merged 5 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Command {
listen_addr,
prometheus_exporter::install_recorder()?,
Arc::clone(&db),
factory.static_file_provider(),
metrics_process::Collector::default(),
)
.await?;
Expand Down
15 changes: 10 additions & 5 deletions crates/node-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,17 +395,22 @@ where
// Does not do anything on windows.
fdlimit::raise_fd_limit()?;

let prometheus_handle = config.install_prometheus_recorder()?;
config.start_metrics_endpoint(prometheus_handle, database.clone()).await?;

info!(target: "reth::cli", "Database opened");

let provider_factory = ProviderFactory::new(
database.clone(),
Arc::clone(&config.chain),
data_dir.static_files_path(),
)?
.with_static_files_metrics();
info!(target: "reth::cli", "Database opened");

let prometheus_handle = config.install_prometheus_recorder()?;
config
.start_metrics_endpoint(
prometheus_handle,
database.clone(),
provider_factory.static_file_provider(),
)
.await?;

debug!(target: "reth::cli", chain=%config.chain.chain, genesis=?config.chain.genesis_hash(), "Initializing genesis");

Expand Down
16 changes: 16 additions & 0 deletions crates/node-core/src/metrics/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use reth_db::database_metrics::DatabaseMetrics;
use reth_metrics::metrics::Unit;
use reth_provider::providers::StaticFileProvider;
use std::{convert::Infallible, net::SocketAddr, sync::Arc};

pub(crate) trait Hook: Fn() + Send + Sync {}
Expand Down Expand Up @@ -79,17 +80,24 @@ pub async fn serve<Metrics>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
db: Metrics,
static_file_provider: StaticFileProvider,
process: metrics_process::Collector,
) -> eyre::Result<()>
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
{
let db_metrics_hook = move || db.report_metrics();
let static_file_metrics_hook = move || {
let _ = static_file_provider.report_metrics().map_err(
|error| tracing::error!(%error, "Failed to report static file provider metrics"),
);
};

// Clone `process` to move it into the hook and use the original `process` for describe below.
let cloned_process = process.clone();
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(db_metrics_hook),
Box::new(static_file_metrics_hook),
Box::new(move || cloned_process.collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
Expand All @@ -106,6 +114,14 @@ where
"db.timed_out_not_aborted_transactions",
"Number of timed out transactions that were not aborted by the user yet"
);

describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
describe_gauge!(
"static_files.segment_entries",
"The number of entries for a static file segment"
);

process.describe();
describe_memory_stats();
describe_io_stats();
Expand Down
9 changes: 6 additions & 3 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ use reth_primitives::{
BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, TxHash, B256, MAINNET,
};
use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockNumReader, BlockReader,
BlockchainTreePendingStateProvider, CanonStateSubscriptions, HeaderProvider, HeaderSyncMode,
ProviderFactory, StageCheckpointReader,
providers::{BlockchainProvider, StaticFileProvider},
BlockHashReader, BlockNumReader, BlockReader, BlockchainTreePendingStateProvider,
CanonStateSubscriptions, HeaderProvider, HeaderSyncMode, ProviderFactory,
StageCheckpointReader,
};
use reth_revm::EvmProcessorFactory;
use reth_stages::{
Expand Down Expand Up @@ -599,6 +600,7 @@ impl NodeConfig {
&self,
prometheus_handle: PrometheusHandle,
db: Metrics,
static_file_provider: StaticFileProvider,
) -> eyre::Result<()>
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
Expand All @@ -609,6 +611,7 @@ impl NodeConfig {
listen_addr,
prometheus_handle,
db,
static_file_provider,
metrics_process::Collector::default(),
)
.await?;
Expand Down
44 changes: 44 additions & 0 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use reth_primitives::{
};
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
fs::File,
ops::{Deref, Range, RangeBounds, RangeInclusive},
path::{Path, PathBuf},
sync::{mpsc, Arc},
Expand Down Expand Up @@ -114,6 +115,49 @@ impl StaticFileProvider {
Self(Arc::new(provider))
}

/// Reports metrics for the static files.
pub fn report_metrics(&self) -> ProviderResult<()> {
let Some(metrics) = &self.metrics else { return Ok(()) };

let static_files = iter_static_files(&self.path)?;
for (segment, ranges) in static_files {
let mut entries = 0;
let mut size = 0;

for (block_range, _) in &ranges {
let fixed_block_range = find_fixed_range(block_range.start());
let jar_provider = self
.get_segment_provider(segment, || Some(fixed_block_range), None)?
.ok_or(ProviderError::MissingStaticFileBlock(segment, block_range.start()))?;

entries += jar_provider.rows();

let data_size = File::open(jar_provider.data_path())
.and_then(|file| file.metadata())
.map(|metadata| metadata.len())
.unwrap_or_default();
let index_size = File::open(jar_provider.index_path())
.and_then(|file| file.metadata())
.map(|metadata| metadata.len())
.unwrap_or_default();
let offsets_size = File::open(jar_provider.offsets_path())
.and_then(|file| file.metadata())
.map(|metadata| metadata.len())
.unwrap_or_default();
let config_size = File::open(jar_provider.config_path())
.and_then(|file| file.metadata())
.map(|metadata| metadata.len())
.unwrap_or_default();

joshieDo marked this conversation as resolved.
Show resolved Hide resolved
size += data_size + index_size + offsets_size + config_size;
}

metrics.record_segment(segment, size, ranges.len(), entries);
}

Ok(())
}

/// Gets the [`StaticFileJarProvider`] of the requested segment and block.
pub fn get_segment_provider_from_block(
&self,
Expand Down
39 changes: 38 additions & 1 deletion crates/storage/provider/src/providers/static_file/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{collections::HashMap, time::Duration};

use itertools::Itertools;
use metrics::{Counter, Histogram};
use metrics::{Counter, Gauge, Histogram};
use reth_metrics::Metrics;
use reth_primitives::StaticFileSegment;
use strum::{EnumIter, IntoEnumIterator};

/// Metrics for the static file provider.
#[derive(Debug)]
pub struct StaticFileProviderMetrics {
segments: HashMap<StaticFileSegment, StaticFileSegmentMetrics>,
segment_operations: HashMap<
(StaticFileSegment, StaticFileProviderOperation),
StaticFileProviderOperationMetrics,
Expand All @@ -18,6 +19,14 @@ pub struct StaticFileProviderMetrics {
impl Default for StaticFileProviderMetrics {
fn default() -> Self {
Self {
segments: StaticFileSegment::iter()
.map(|segment| {
(
segment,
StaticFileSegmentMetrics::new_with_labels(&[("segment", segment.as_str())]),
)
})
.collect(),
segment_operations: StaticFileSegment::iter()
.cartesian_product(StaticFileProviderOperation::iter())
.map(|(segment, operation)| {
Expand All @@ -35,6 +44,22 @@ impl Default for StaticFileProviderMetrics {
}

impl StaticFileProviderMetrics {
pub(crate) fn record_segment(
&self,
segment: StaticFileSegment,
size: u64,
files: usize,
entries: usize,
) {
self.segments.get(&segment).expect("segment metrics should exist").size.set(size as f64);
self.segments.get(&segment).expect("segment metrics should exist").files.set(files as f64);
self.segments
.get(&segment)
.expect("segment metrics should exist")
.entries
.set(entries as f64);
}

pub(crate) fn record_segment_operation(
&self,
segment: StaticFileSegment,
Expand Down Expand Up @@ -80,6 +105,18 @@ impl StaticFileProviderOperation {
}
}

/// Metrics for a specific static file segment.
#[derive(Metrics)]
#[metrics(scope = "static_files.segment")]
pub(crate) struct StaticFileSegmentMetrics {
/// The size of a static file segment
size: Gauge,
/// The number of files for a static file segment
files: Gauge,
/// The number of entries for a static file segment
entries: Gauge,
}

#[derive(Metrics)]
#[metrics(scope = "static_files.jar_provider")]
pub(crate) struct StaticFileProviderOperationMetrics {
Expand Down
Loading
Loading