Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC cache system #1083

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- ci: scope cache by branch and add cache cleanup
- feat: print development accounts at node startup
- test: add test to check tx signed by OZ account can be signed with Argent pk
- feat: lru cache for the rpc layer

## v0.2.0

Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 11 additions & 8 deletions crates/client/rpc-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub trait StarknetRpcApi {

/// Get the most recent accepted block hash and number
#[method(name = "blockHashAndNumber")]
fn block_hash_and_number(&self) -> RpcResult<BlockHashAndNumber>;
async fn block_hash_and_number(&self) -> RpcResult<BlockHashAndNumber>;

/// Get the number of transactions in a block given a block id
#[method(name = "getBlockTransactionCount")]
fn get_block_transaction_count(&self, block_id: BlockId) -> RpcResult<u128>;
async fn get_block_transaction_count(&self, block_id: BlockId) -> RpcResult<u128>;

/// Get the value of the storage at the given address and key, at the given block id
#[method(name = "getStorageAt")]
Expand Down Expand Up @@ -70,15 +70,15 @@ pub trait StarknetRpcApi {

/// Get block information with transaction hashes given the block id
#[method(name = "getBlockWithTxHashes")]
fn get_block_with_tx_hashes(&self, block_id: BlockId) -> RpcResult<MaybePendingBlockWithTxHashes>;
async fn get_block_with_tx_hashes(&self, block_id: BlockId) -> RpcResult<MaybePendingBlockWithTxHashes>;

/// Get the nonce associated with the given address at the given block
#[method(name = "getNonce")]
fn get_nonce(&self, block_id: BlockId, contract_address: FieldElement) -> RpcResult<Felt>;

/// Get block information with full transactions given the block id
#[method(name = "getBlockWithTxs")]
fn get_block_with_txs(&self, block_id: BlockId) -> RpcResult<MaybePendingBlockWithTxs>;
async fn get_block_with_txs(&self, block_id: BlockId) -> RpcResult<MaybePendingBlockWithTxs>;

/// Get the chain id
#[method(name = "chainId")]
Expand Down Expand Up @@ -108,11 +108,11 @@ pub trait StarknetRpcApi {

/// Get the details of a transaction by a given block id and index
#[method(name = "getTransactionByBlockIdAndIndex")]
fn get_transaction_by_block_id_and_index(&self, block_id: BlockId, index: usize) -> RpcResult<Transaction>;
async fn get_transaction_by_block_id_and_index(&self, block_id: BlockId, index: usize) -> RpcResult<Transaction>;

/// Get the information about the result of executing the requested block
#[method(name = "getStateUpdate")]
fn get_state_update(&self, block_id: BlockId) -> RpcResult<StateUpdate>;
async fn get_state_update(&self, block_id: BlockId) -> RpcResult<StateUpdate>;

/// Returns the transactions in the transaction pool, recognized by this sequencer
#[method(name = "pendingTransactions")]
Expand All @@ -131,9 +131,12 @@ pub trait StarknetRpcApi {

/// Returns the information about a transaction by transaction hash.
#[method(name = "getTransactionByHash")]
fn get_transaction_by_hash(&self, transaction_hash: FieldElement) -> RpcResult<Transaction>;
async fn get_transaction_by_hash(&self, transaction_hash: FieldElement) -> RpcResult<Transaction>;

/// Returns the receipt of a transaction by transaction hash.
#[method(name = "getTransactionReceipt")]
fn get_transaction_receipt(&self, transaction_hash: FieldElement) -> RpcResult<MaybePendingTransactionReceipt>;
async fn get_transaction_receipt(
&self,
transaction_hash: FieldElement,
) -> RpcResult<MaybePendingTransactionReceipt>;
}
9 changes: 9 additions & 0 deletions crates/client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mp-starknet = { workspace = true }
# Substate primitives
frame-support = { workspace = true }
frame-system = { workspace = true }
sc-service = { workspace = true }
sc-transaction-pool-api = { workspace = true }
sp-api = { workspace = true, default-features = true }
sp-arithmetic = { workspace = true, default-features = true }
Expand All @@ -44,14 +45,22 @@ starknet-core = { workspace = true }
starknet-ff = { workspace = true }
starknet_api = { workspace = true, default-features = false }
# Others
futures = { workspace = true }
hex = { workspace = true, default-features = true }
jsonrpsee = { workspace = true, default-features = true, features = [
"server",
"macros",
] }
log = { workspace = true, default-features = true }
prometheus = { version = "0.13.1", default-features = false }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", workspace = true }
scale-codec = { package = "parity-scale-codec", workspace = true, features = [
"derive",
] }
schnellru = "0.2.1"
serde_json = { workspace = true, default-features = true }
thiserror = { workspace = true }
tokio = "1.32.0"

[dev-dependencies]
rstest = { workspace = true }
Expand Down
141 changes: 141 additions & 0 deletions crates/client/rpc/src/cache/lru_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use scale_codec::Encode;
use schnellru::{ByMemoryUsage, LruMap};

/// Structure used for caching over the RPC layer. It is heavily inspired from [frontier's
/// implementation](https://github.com/paritytech/frontier/blob/194f6bb06152402ba44b340c3d401ae6e0670d96/client/rpc/src/eth/cache/mod.rs#L76).
pub struct LRUCache<K, V> {
/// Cache name.
name: &'static str,
/// Inner cache, with different variant from [`InnerCache`]
inner: LruMap<K, V, ByMemoryUsage>,
/// Metrics generated by our cache for prometheus.
metrics: Option<LRUCacheByteLimitedMetrics>,
}

enum CacheAction {
Hit,
Miss,
Insert,
}

impl<K: Eq + core::hash::Hash, V: Encode> LRUCache<K, V> {
/// Instantiate a new cache with a given name and size limit. If a prometheus registry is passed,
/// then we register our cache metrics.
pub fn new(
cache_name: &'static str,
max_allocated_size: usize,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> Self {
// If a registry was provided, then register our cache metrics to it.
let metrics = match prometheus_registry {
Some(registry) => match LRUCacheByteLimitedMetrics::register(cache_name, &registry) {
Ok(metrics) => Some(metrics),
Err(e) => {
log::error!(target: cache_name, "Failed to register metrics: {:?}", e);
None
}
},
None => None,
};
// Return new empty cache.
Self { name: cache_name, inner: LruMap::new(ByMemoryUsage::new(max_allocated_size)), metrics }
}

/// Try to retrieve a value at a given key from our cache. If metrics are active we update them
/// accordingly.
pub fn get(&mut self, k: &K) -> Option<&mut V> {
let action = if self.inner.get(k).is_some() { CacheAction::Hit } else { CacheAction::Miss };
self.update_metrics(action);
self.inner.get(k)
}

/// Try to insert a new value at a given key in our cache. Also updates our metrics to the
/// current cache size.
pub fn insert(&mut self, k: K, v: V) -> bool {
// Try to insert the entry.
if self.inner.insert(k, v) {
self.update_metrics(CacheAction::Insert);
true
} else {
log::warn!(target: self.name, "Failed to insert entry into cache");
false
}
}

/// Utility function to handle metrics update.
fn update_metrics(&self, action: CacheAction) {
if let Some(metrics) = &self.metrics {
match action {
CacheAction::Hit => metrics.hits.inc(),
CacheAction::Miss => metrics.miss.inc(),
CacheAction::Insert => metrics.allocated_memory_size.set(self.inner.memory_usage() as u64),
}
}
}
}

struct LRUCacheByteLimitedMetrics {
hits: prometheus::IntCounter,
miss: prometheus::IntCounter,
allocated_memory_size: prometheus_endpoint::Gauge<prometheus_endpoint::U64>,
}

impl LRUCacheByteLimitedMetrics {
pub(crate) fn register(
cache_name: &'static str,
registry: &prometheus_endpoint::Registry,
) -> Result<Self, prometheus_endpoint::PrometheusError> {
Ok(Self {
hits: prometheus_endpoint::register(
prometheus::IntCounter::new(
format!("madara_starknet_{}_hits", cache_name),
format!("Hits of starknet {} cache.", cache_name),
)?,
registry,
)?,
miss: prometheus_endpoint::register(
prometheus::IntCounter::new(
format!("madara_starknet_{}_miss", cache_name),
format!("Misses of starknet {} cache.", cache_name),
)?,
registry,
)?,
allocated_memory_size: prometheus_endpoint::register(
prometheus_endpoint::Gauge::new(
format!("madara_starknet_{}_size", cache_name),
format!("Allocated memory size of starknet {} data cache.", cache_name),
)?,
registry,
)?,
})
}
}

#[cfg(test)]
mod tests {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way we can add tests for the metrics too?

use super::*;

#[test]
fn test_failure_insert_over_allocated_limit() {
let mut cache = LRUCache::new("name", 10, None);
// Should fail as base memory allocation for a string is 148 bytes.
assert!(!cache.insert(0, "abcdefghijkl"));
}

#[test]
fn test_work_allocated_size_limit() {
let mut cache = LRUCache::new("name", 150, None);
// Allocated memory will keep being 148 bytes for the three first insert.
assert!(cache.insert(1, "a"));
let memory_usage_step_1 = cache.inner.memory_usage();
assert!(cache.insert(2, "b"));
assert!(cache.insert(3, "c"));
// Memory allocation will try to reach 280 bytes, blocked by our limiter. Insertion should be
// successful but first item should be removed from cache.
assert!(cache.insert(4, "d"));
let memory_usage_step_2 = cache.inner.memory_usage();
assert!(cache.get(&4).is_some());
assert!(cache.get(&1).is_none());
assert_eq!(memory_usage_step_1, memory_usage_step_2);
}
}
Loading