Skip to content

Commit

Permalink
Clone read transactions into TxnManager message listener (#6809)
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Feb 27, 2024
1 parent c268c71 commit f5ce869
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 16 deletions.
12 changes: 7 additions & 5 deletions crates/storage/libmdbx-rs/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,21 +700,23 @@ impl EnvironmentBuilder {
}
}

let env_ptr = EnvPtr(env);

#[cfg(not(feature = "read-tx-timeouts"))]
let txn_manager = TxnManager::new(EnvPtr(env));
let txn_manager = TxnManager::new(env_ptr);

#[cfg(feature = "read-tx-timeouts")]
let txn_manager = {
let mut txn_manager = TxnManager::new(EnvPtr(env));
if let crate::MaxReadTransactionDuration::Set(duration) = self
.max_read_transaction_duration
.unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
DEFAULT_MAX_READ_TRANSACTION_DURATION,
))
{
txn_manager = txn_manager.with_max_read_transaction_duration(duration);
};
txn_manager
TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
} else {
TxnManager::new(env_ptr)
}
};

let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
Expand Down
55 changes: 44 additions & 11 deletions crates/storage/libmdbx-rs/src/txn_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,33 @@ impl TxnManager {

#[cfg(feature = "read-tx-timeouts")]
mod read_transactions {
use crate::{error::mdbx_result, txn_manager::TxnManager, Error};
use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error};
use dashmap::{DashMap, DashSet};
use std::{
sync::Arc,
sync::{mpsc::sync_channel, Arc},
time::{Duration, Instant},
};
use tracing::{error, trace, warn};

const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);

impl TxnManager {
/// Sets the maximum duration that a read transaction can be open.
pub(crate) fn with_max_read_transaction_duration(
mut self,
/// Returns a new instance for which the maximum duration that a read transaction can be
/// open is set.
pub(crate) fn new_with_max_read_transaction_duration(
env: EnvPtr,
duration: Duration,
) -> TxnManager {
) -> Self {
let read_transactions = Arc::new(ReadTransactions::new(duration));
read_transactions.clone().start_monitor();
self.read_transactions = Some(read_transactions);

self
let (tx, rx) = sync_channel(0);

let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };

txn_manager.start_message_listener(env, rx);

txn_manager
}

/// Adds a new transaction to the list of active read transactions.
Expand Down Expand Up @@ -311,10 +317,12 @@ mod read_transactions {
#[cfg(test)]
mod tests {
use crate::{
txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
MaxReadTransactionDuration,
txn_manager::{
read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr,
},
Environment, Error, MaxReadTransactionDuration, TransactionKind, RO,
};
use std::{thread::sleep, time::Duration};
use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration};
use tempfile::tempdir;

#[test]
Expand Down Expand Up @@ -387,5 +395,30 @@ mod read_transactions {
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
assert!(tx.commit().is_ok())
}

#[test]
fn txn_manager_begin_read_transaction_via_message_listener() {
const MAX_DURATION: Duration = Duration::from_secs(1);

let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
.open(dir.path())
.unwrap();

let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();

// Create a read-only transaction via the message listener.
let (tx, rx) = sync_channel(0);
env.txn_manager().send_message(TxnManagerMessage::Begin {
parent: TxnPtr(ptr::null_mut()),
flags: RO::OPEN_FLAGS,
sender: tx,
});

let txn_ptr = rx.recv().unwrap().unwrap();

assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize)));
}
}
}

0 comments on commit f5ce869

Please sign in to comment.