Skip to content

Commit

Permalink
Report cache flusher errors to the caller
Browse files Browse the repository at this point in the history
By reporting cache flushing problems to the caller,
we can log those errors through Log instead of using eprintln.

Additionally, flush errors at exit are also logged now.
  • Loading branch information
pkolaczk committed Oct 22, 2023
1 parent 72d5381 commit aeb3d5a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 13 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fclones/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ byte-unit = "4.0"
chrono = { version = "0.4", default-features = false, features = ["serde", "clock", "std"] }
clap = { version = "4.4", features = ["derive", "cargo", "wrap_help"] }
console = "0.15"
crossbeam-channel = "0.5"
crossbeam-utils = "0.8"
csv = "1.1"
dashmap = "5.2"
Expand Down
40 changes: 27 additions & 13 deletions fclones/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Persistent caching of file hashes

use crossbeam_channel::RecvTimeoutError;
use std::fmt::{Display, Formatter};
use std::fs::create_dir_all;
use std::sync::mpsc::{channel, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, UNIX_EPOCH};
Expand Down Expand Up @@ -47,7 +47,7 @@ const FLUSH_INTERVAL: Duration = Duration::from_millis(1000);
/// them from file data.
pub struct HashCache {
cache: Arc<InnerCache>,
_flusher: HashCacheFlusher,
flusher: HashCacheFlusher,
}

impl HashCache {
Expand Down Expand Up @@ -76,10 +76,7 @@ impl HashCache {
let tree_id = format!("hash_db:{:?}:{}", algorithm, transform.unwrap_or("<none>"));
let cache = Arc::new(typed_sled::Tree::open(&db, tree_id));
let flusher = HashCacheFlusher::start(&cache);
Ok(HashCache {
cache,
_flusher: flusher,
})
Ok(HashCache { cache, flusher })
}

/// Opens the file hash database located in `fclones` subdir of user cache directory.
Expand Down Expand Up @@ -115,7 +112,11 @@ impl HashCache {
.insert(key, &value)
.map_err(|e| format!("Failed to write entry to cache: {e}"))?;

Ok(())
// Check for cache flush errors. If there were errors, report them to the caller.
match self.flusher.err_channel.try_recv() {
Ok(err) => Err(err),
Err(_) => Ok(()),
}
}

/// Retrieves the cached hash of a file.
Expand Down Expand Up @@ -163,24 +164,36 @@ impl HashCache {
};
Ok(key)
}

/// Flushes all unwritten data and closes the cache.
pub fn close(self) -> Result<(), Error> {
self.cache
.flush()
.map_err(|e| format!("Failed to flush cache: {e}"))?;
Ok(())
}
}

/// Periodically flushes the cache in a background thread
struct HashCacheFlusher {
thread_handle: Option<JoinHandle<()>>,
control_channel: Option<Mutex<Sender<()>>>, // wrapped in Mutex because Sender is not Send in older versions of Rust
control_channel: Option<crossbeam_channel::Sender<()>>,
err_channel: crossbeam_channel::Receiver<Error>,
}

impl HashCacheFlusher {
fn start(cache: &Arc<InnerCache>) -> HashCacheFlusher {
let cache = Arc::downgrade(cache);
let (tx, rx) = channel::<()>();
let (ctrl_tx, ctrl_rx) = crossbeam_channel::bounded::<()>(1);
let (err_tx, err_rx) = crossbeam_channel::bounded(1);

let thread_handle = thread::spawn(move || {
while let Err(RecvTimeoutError::Timeout) = rx.recv_timeout(FLUSH_INTERVAL) {
while let Err(RecvTimeoutError::Timeout) = ctrl_rx.recv_timeout(FLUSH_INTERVAL) {
if let Some(cache) = cache.upgrade() {
if let Err(e) = cache.flush() {
eprintln!("Failed to flush hash cache: {e}");
err_tx
.send(format!("Failed to flush the hash cache: {e}").into())
.unwrap_or_default();
return;
}
}
Expand All @@ -189,7 +202,8 @@ impl HashCacheFlusher {

HashCacheFlusher {
thread_handle: Some(thread_handle),
control_channel: Some(Mutex::new(tx)),
control_channel: Some(ctrl_tx),
err_channel: err_rx,
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions fclones/src/hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,16 @@ impl FileHasher<'_> {
}
}

impl<'a> Drop for FileHasher<'a> {
fn drop(&mut self) {
if let Some(cache) = self.cache.take() {
if let Err(e) = cache.close() {
self.log.warn(e);
}
}
}
}

fn format_output_stream(output: &str) -> String {
let output = output.trim().to_string();
if output.is_empty() {
Expand Down

0 comments on commit aeb3d5a

Please sign in to comment.