Skip to content

Commit

Permalink
Merge pull request #12 from hashlookup/prep-new-release
Browse files Browse the repository at this point in the history
prepare a new release
  • Loading branch information
qjerome committed Jul 24, 2024
2 parents 742df9c + baf2ad9 commit 9890231
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 131 deletions.
64 changes: 37 additions & 27 deletions poppy/src/bin/poppy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,54 +215,64 @@ fn count_lines_parallel<P: AsRef<Path> + Clone>(
Ok(count)
}

fn process_file(bf: &mut BloomFilter, input: &String, verbose: bool) -> Result<(), anyhow::Error> {
if verbose {
eprintln!("processing file: {input}");
}
let in_file = std::fs::File::open(input)?;

for line in std::io::BufReader::new(in_file).lines() {
let line = line?;
bf.insert_bytes(&line)?;
}

Ok(())
}

fn parallel_insert(
bf: BloomFilter,
files: Vec<String>,
jobs: usize,
verbose: bool,
) -> Result<BloomFilter, anyhow::Error> {
let arc_bf = Arc::new(std::sync::Mutex::new(bf));
let mut handles = vec![];
let jobs = optimal_jobs(jobs);

let batches = files.chunks(max(files.len() / jobs, 1));
let mut bfs = vec![];
for _ in 0..(batches.len() - 1) {
bfs.push(bf.clone())
}
// we move bf to prevent doing an additional copy
bfs.push(bf);

let mut handles = vec![];

// map processing
for batch in batches {
let shared = Arc::clone(&arc_bf);
let batch: Vec<String> = batch.to_vec();
let mut copy = shared
.lock()
.map_err(|e| anyhow!("failed to lock mutex: {}", e))?
.clone();
let mut copy = bfs.pop().unwrap();

let h = thread::spawn(move || {
for input in batch {
if verbose {
eprintln!("processing file: {input}");
}
let in_file = std::fs::File::open(&input)?;

for line in std::io::BufReader::new(in_file).lines() {
let line = line?;
copy.insert_bytes(&line)?;
}
process_file(&mut copy, &input, verbose)?;
}

let mut shared = shared
.lock()
.map_err(|e| anyhow!("failed to lock mutex: {}", e))?;
shared.union_merge(&copy)?;

Ok::<(), anyhow::Error>(())
Ok::<_, anyhow::Error>(copy)
});
handles.push(h)
}

for h in handles {
h.join().expect("failed to join thread")?;
}
// reduce
let mut bfs = handles
.into_iter()
.map(|h| h.join().expect("failed to join thread").unwrap())
.collect::<Vec<BloomFilter>>();

let out = arc_bf.lock().expect("failed to lock mutex").clone();
// code should never panic here
let mut out = bfs.pop().expect("bpfs must always have one item");
while let Some(bf) = bfs.pop() {
out.union_merge(&bf)?;
}

Ok(out)
}
Expand Down Expand Up @@ -542,7 +552,7 @@ fn main() -> Result<(), anyhow::Error> {
}
Command::Show(o) => {
let bloom_file = File::open(&o.file)?;
let b = BloomFilter::from_reader(bloom_file)?;
let b = BloomFilter::partial_from_reader(bloom_file)?;
println!("File: {}", o.file);
show_bf_properties(&b);
}
Expand Down
7 changes: 6 additions & 1 deletion poppy/src/bitset/vec.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
#[derive(Debug, Clone)]
pub struct VecBitSet(Vec<u8>);

#[inline(always)]
pub(crate) fn byte_cap_from_bit_cap(bit_cap: usize) -> usize {
((bit_cap as f64) / 8.0).ceil() as usize
}

impl VecBitSet {
/// Creates a new bitset with a given capacity
pub fn with_bit_capacity(capacity: usize) -> Self {
let byte_size = ((capacity as f64) / 8.0).ceil() as usize;
let byte_size = byte_cap_from_bit_cap(capacity);
Self(vec![0; byte_size])
}

Expand Down
68 changes: 52 additions & 16 deletions poppy/src/bloom.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::{self, Read, Write};
use std::io::{self, Read, Seek, Write};
use thiserror::Error;

mod utils;
Expand Down Expand Up @@ -97,6 +97,8 @@ impl TryFrom<u8> for OptLevel {

#[derive(Debug, Error)]
pub enum Error {
#[error("fpp value must be in ]0;1] fpp={0}")]
WrongFpp(f64),
#[error("{0}")]
IoError(#[from] io::Error),
#[error("invalid version {0}")]
Expand All @@ -122,7 +124,7 @@ pub enum Error {
/// let p = Params::new(1000, 0.1);
/// // will create a V2 version of BloomFilter from
/// // parameters
/// let bf = p.into_v2();
/// let bf = p.try_into_v2().unwrap();
/// ```
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct Params {
Expand Down Expand Up @@ -161,13 +163,13 @@ impl Params {
}

/// Turn Params into [v1::BloomFilter]
pub fn into_v1(self) -> v1::BloomFilter {
self.into()
pub fn try_into_v1(self) -> Result<v1::BloomFilter, Error> {
self.try_into()
}

/// Turn Params into [v2::BloomFilter]
pub fn into_v2(self) -> v2::BloomFilter {
self.into()
pub fn try_into_v2(self) -> Result<v2::BloomFilter, Error> {
self.try_into()
}
}

Expand All @@ -187,7 +189,7 @@ impl TryFrom<Params> for BloomFilter {
/// ```
/// use poppy_filters::BloomFilter;
///
/// let mut bf = BloomFilter::with_capacity(1042, 0.1);
/// let mut bf = BloomFilter::with_capacity(1042, 0.1).unwrap();
///
/// bf.insert_bytes("toto");
/// assert!(bf.contains_bytes(String::from("toto")));
Expand All @@ -206,8 +208,8 @@ impl BloomFilter {
opt: OptLevel,
) -> Result<Self, Error> {
match version {
1 => Ok(Self::V1(v1::BloomFilter::with_capacity(cap as u64, fpp))),
2 => Ok(Self::V2(v2::BloomFilter::make(cap as u64, fpp, opt))),
1 => Ok(Self::V1(v1::BloomFilter::with_capacity(cap as u64, fpp)?)),
2 => Ok(Self::V2(v2::BloomFilter::make(cap as u64, fpp, opt)?)),
_ => Err(Error::InvalidVersion(version)),
}
}
Expand All @@ -218,8 +220,8 @@ impl BloomFilter {
}

/// Creates a new filter with a given capacity and false positive probability (fpp)
pub fn with_capacity(cap: usize, fpp: f64) -> Self {
Self::V2(v2::BloomFilter::with_capacity(cap as u64, fpp))
pub fn with_capacity(cap: usize, fpp: f64) -> Result<Self, Error> {
Ok(Self::V2(v2::BloomFilter::with_capacity(cap as u64, fpp)?))
}

/// Fill a bloom filter with several entries
Expand All @@ -233,14 +235,39 @@ impl BloomFilter {
/// Creates a BloomFilter from a reader. The data read is supposed
/// to be in the appropriate format, if not the function returns an
/// Err.
pub fn from_reader<R: Read>(r: R) -> Result<Self, Error> {
pub fn from_reader<R: Read + Seek>(r: R) -> Result<Self, Error> {
let mut r = io::BufReader::new(r);

let flags = read_flags(&mut r)?;

match flags.version {
1 => Ok(Self::V1(v1::BloomFilter::from_reader_with_flags(r, flags)?)),
2 => Ok(Self::V2(v2::BloomFilter::from_reader_with_flags(r, flags)?)),
1 => Ok(Self::V1(v1::BloomFilter::from_reader_with_flags(
r, flags, false,
)?)),
2 => Ok(Self::V2(v2::BloomFilter::from_reader_with_flags(
r, flags, false,
)?)),
_ => Err(Error::InvalidVersion(flags.version)),
}
}

/// Creates a **partial** BloomFilter from a reader. It means the reader
/// is used only to fill metadata. This method will be significantly faster
/// than [BloomFilter::from_reader] for use cases where only filter
/// metadata is used. An example use case is for instance: showing filter
/// information.
pub fn partial_from_reader<R: Read + Seek>(r: R) -> Result<Self, Error> {
let mut r = io::BufReader::new(r);

let flags = read_flags(&mut r)?;

match flags.version {
1 => Ok(Self::V1(v1::BloomFilter::from_reader_with_flags(
r, flags, true,
)?)),
2 => Ok(Self::V2(v2::BloomFilter::from_reader_with_flags(
r, flags, true,
)?)),
_ => Err(Error::InvalidVersion(flags.version)),
}
}
Expand Down Expand Up @@ -390,7 +417,16 @@ impl BloomFilter {
}
}

/// macro to ease bloom filter creation
/// Macro to ease bloom filter creation
///
/// # Panic
///
/// This macro might generate code which panics
/// either at filter creation or at element insertion.
/// The code may panic if false positive probability is
/// not in the expected range ]0;1] or if capacity element
/// insertion cannot happen because the filter is full.
///
/// # Example
/// ```
/// use poppy_filters::bloom;
Expand All @@ -411,7 +447,7 @@ impl BloomFilter {
#[macro_export]
macro_rules! bloom {
($cap:expr, $proba:expr) => {
$crate::BloomFilter::with_capacity($cap, $proba)
$crate::BloomFilter::with_capacity($cap, $proba).unwrap()
};

($cap:expr, $proba:expr, [$($values:literal),*]) => {
Expand Down
Loading

0 comments on commit 9890231

Please sign in to comment.