Skip to content

Commit

Permalink
feat(hydroflow_datalog)!: add detupling syntax and allow interleaving…
Browse files Browse the repository at this point in the history
… with flattening (#1166)
  • Loading branch information
shadaj authored and MingweiSamuel committed Apr 23, 2024
1 parent 997d90a commit c9dc66d
Show file tree
Hide file tree
Showing 28 changed files with 1,985 additions and 206 deletions.
107 changes: 105 additions & 2 deletions hydroflow/tests/datalog_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ fn test_collect_vec() {
}

#[multiplatform_test]
fn test_splat() {
fn test_flatten() {
let (ints1_send, ints1) = hydroflow::util::unbounded_channel::<(i64, Vec<i64>)>();
let (result, mut result_recv) = hydroflow::util::unbounded_channel::<(i64, i64)>();

Expand All @@ -1172,7 +1172,7 @@ fn test_splat() {
.output result `for_each(|v| result.send(v).unwrap())`
result(a, *b) :- ints1(a, b)
result(a, b) :- ints1(a, *b)
"#
);

Expand All @@ -1187,6 +1187,109 @@ fn test_splat() {
);
}

#[test]
fn test_detuple() {
let (ints1_send, ints1) = hydroflow::util::unbounded_channel::<((i64, i64),)>();
let (result, mut result_recv) = hydroflow::util::unbounded_channel::<(i64, i64)>();

let mut flow = datalog!(
r#"
.input ints1 `source_stream(ints1)`
.output result `for_each(|v| result.send(v).unwrap())`
result(a, b) :- ints1((a, b))
"#
);

ints1_send.send(((1, 2),)).unwrap();
ints1_send.send(((3, 4),)).unwrap();

flow.run_tick();

assert_eq!(
&*collect_ready::<Vec<_>, _>(&mut result_recv),
&[(1, 2), (3, 4)]
);
}

#[test]
fn test_multi_detuple() {
let (ints1_send, ints1) = hydroflow::util::unbounded_channel::<((i64, i64), (i64, i64))>();
let (result, mut result_recv) = hydroflow::util::unbounded_channel::<(i64, i64, i64, i64)>();

let mut flow = datalog!(
r#"
.input ints1 `source_stream(ints1)`
.output result `for_each(|v| result.send(v).unwrap())`
result(a, b, c, d) :- ints1((a, b), (c, d))
"#
);

ints1_send.send(((1, 2), (3, 4))).unwrap();
ints1_send.send(((5, 6), (7, 8))).unwrap();

flow.run_tick();

assert_eq!(
&*collect_ready::<Vec<_>, _>(&mut result_recv),
&[(1, 2, 3, 4), (5, 6, 7, 8)]
);
}

#[test]
fn test_flat_then_detuple() {
let (ints1_send, ints1) = hydroflow::util::unbounded_channel::<(Vec<(i64, i64)>,)>();
let (result, mut result_recv) = hydroflow::util::unbounded_channel::<(i64, i64)>();

let mut flow = datalog!(
r#"
.input ints1 `source_stream(ints1)`
.output result `for_each(|v| result.send(v).unwrap())`
result(a, b) :- ints1(*(a, b))
"#
);

ints1_send.send((vec![(1, 2), (3, 4)],)).unwrap();
ints1_send.send((vec![(5, 6), (7, 8)],)).unwrap();

flow.run_tick();

assert_eq!(
&*collect_ready::<Vec<_>, _>(&mut result_recv),
&[(1, 2), (3, 4), (5, 6), (7, 8)]
);
}

#[test]
fn test_detuple_then_flat() {
let (ints1_send, ints1) = hydroflow::util::unbounded_channel::<((Vec<i64>, Vec<i64>),)>();
let (result, mut result_recv) = hydroflow::util::unbounded_channel::<(i64, i64)>();

let mut flow = datalog!(
r#"
.input ints1 `source_stream(ints1)`
.output result `for_each(|v| result.send(v).unwrap())`
result(a, b) :- ints1((*a, *b))
"#
);

ints1_send.send(((vec![1, 2], vec![3, 4]),)).unwrap();

flow.run_tick();

assert_eq!(
&*collect_ready::<Vec<_>, _>(&mut result_recv),
&[(1, 3), (1, 4), (2, 3), (2, 4)]
);
}

// #[ignore] doesn't seem to work for #[multiplatform_test]
// #[ignore] // This test depends on the ordering of specific tuples which is undefined.
// #[multiplatform_test]
Expand Down
21 changes: 15 additions & 6 deletions hydroflow_datalog_core/src/grammar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,28 @@ pub mod datalog {

#[derive(Debug, Clone)]
pub enum Atom {
Relation(
#[rust_sitter::leaf(text = "!")] Option<()>,
NegRelation(
#[rust_sitter::leaf(text = "!")] (),
Spanned<InputRelationExpr>,
),
PosRelation(Spanned<InputRelationExpr>),
Predicate(Spanned<BoolExpr>),
}

#[derive(Debug, Clone)]
pub enum IdentOrUnderscore {
pub enum ExtractExpr {
Ident(Spanned<Ident>),
Underscore(#[rust_sitter::leaf(text = "_")] Spanned<()>),
Flatten(#[rust_sitter::leaf(text = "*")] (), Box<ExtractExpr>),
Untuple(
#[rust_sitter::leaf(text = "(")] (),
#[rust_sitter::delimited(
#[rust_sitter::leaf(text = ",")]
()
)]
Vec<Spanned<ExtractExpr>>,
#[rust_sitter::leaf(text = ")")] (),
),
}

#[derive(Debug, Clone)]
Expand All @@ -98,7 +109,7 @@ pub mod datalog {
#[rust_sitter::leaf(text = ",")]
()
)]
pub fields: Vec<Spanned<IdentOrUnderscore>>,
pub fields: Vec<Spanned<ExtractExpr>>,

#[rust_sitter::leaf(text = ")")]
_r_paren: (),
Expand Down Expand Up @@ -135,7 +146,6 @@ pub mod datalog {
#[derive(Debug, Clone)]
pub enum TargetExpr {
Expr(IntExpr),
Splat(#[rust_sitter::leaf(text = "*")] (), Spanned<Ident>),
Aggregation(Aggregation),
Index(
#[rust_sitter::leaf(text = "index")] (),
Expand All @@ -148,7 +158,6 @@ pub mod datalog {
pub fn idents(&self) -> Vec<&Ident> {
match self {
TargetExpr::Expr(e) => e.idents(),
TargetExpr::Splat(_, i) => vec![&i.value],
TargetExpr::Aggregation(Aggregation::Count(_)) => vec![],
TargetExpr::Aggregation(
Aggregation::CountUnique(_, _, idents, _)
Expand Down
Loading

0 comments on commit c9dc66d

Please sign in to comment.