Skip to content

Latest commit

 

History

History
397 lines (238 loc) · 31.1 KB

2015-11-27.md

File metadata and controls

397 lines (238 loc) · 31.1 KB

An introduction to differential dataflow, part 2.

Note: In progress.

This is the second, much delayed, post about differential dataflow. In this post we are going to talk through some of the moving parts of differential dataflow, and sketch some of the reasons behind various choices. Ideally, we'll get you to a point where you understand what should be true about a differential dataflow operator, which may allow you to reason about the work done and messages sent by operators in your computations.

Since it has been a while, let's recap what is exciting about differential dataflow, and why you might be interested in its implementation. First, differential dataflow allows you to do some pretty neat things, incrementally updating complicated big-data computations in near real-time. The previous post explains bread-first search, and how you can you can both perform the computation efficiently, as well as update the computation in microseconds. There is a whole general big-data programming language in which all computations are incrementally updateable.

Not only can you do neat things with differential dataflow, but its implementation is also pretty neat. Although there are all these incremental updates flowing around, the internal state is basically just a bunch of append-only lists. This is pretty cool, because we are able to get fine-grained incremental updates, and still take advantage of the great properties of append-only immutable state (what Joe Hellerstein calls progressive systems). Issues like fault-tolerance are a lot easier to handle (haven't yet, sorry), as well as some pretty cool examples of state-sharing between operators which depart a bit from "pure" dataflow.

There are a lot of really cool research opportunities in the implementation of differential dataflow.


We introduced differential dataflow in the first post in the series. Apparently, differential dataflow is a high-level declarative-ish programming language which looks a bit like MapReduce and SQL and stuff, but with a fancy iterate operator. Also, you can arbitrarily update the input collections, adding or removing records and quickly getting the corresponding updates to the outputs of your computation.

That all sounds a bit magical. So, let's try and demystify it a little bit.

Dataflow programming

We write differential dataflow programs in a language that looks relatively procedural. For example, we did breadth-first search like so:

let result = roots.iterate(|dists|
    dists.join(&edges)
         .map(|(node, dist, next)| (next, dist + 1))
         .concat(&dists)
         .group(|node, ds| ds.min())
);

Behind the scenes it is actually turned into a dataflow graph. For example, this program turns into a graph that looks a bit like this:

BFS dataflow

Dataflow graphs are pretty helpful in a distributed setting because it allows us to describe the program in a way where the execution is mainly data-driven. Each worker knows that when it receives some data on one of these edges, it should perform the logic associated with the destination operator. This is often a lot easier than trying to farm out arbitrary bits of computation which might depend on shared state that is also being updated by other workers and who really knows what is going on anyhow.

One less obvious advantage of this approach is that each of the boxes have names on them. Things like join and map and group. These names are more than just a hunk of code: they describe a functional mapping from the input to the output. This declarative approach to computation not only explains what we should compute the first time around, but it guides us guidance about how to update and re-compute things when their inputs change.

Before talking about how the parts move, we'll want to lay a clear foundation for what the moving parts need to accomplish.

A foundation

Our guiding principle will be that each of our dataflow operators should have a unchanging functional relationship between its inputs and outputs, and that computation will result from changing the operator's inputs (and consequently their outputs). Importantly, this will be true not only for primitive operators like join and group, but also for composite "operators" like the box containing the iterative computation in the figure above, which we might call the bfs operator. Our computational model will build up larger and larger functional blocks, whose inputs we will vary in interesting ways.

This functional relationship just needs to exist for now; we'll worry about implementation in a bit.

Functional definitions of computations isn't something new here. There are lots of citations and stuff. However, given the weird and wonderful ways we are going to mess around with inputs to computations, we need to be rock-solid on what each of our operators "means".

Evaluating acyclic dataflow

We aren't go into this all that much, but there are lots of systems out there that can take a big input collections and a directed acyclic dataflow graph of functional operators, and evaluate the dataflow operator at a time. Thinking it out, we can determine what each of the operators should produce as output, look at the next operators, that sort of thing.

Sequences of inputs

There are any number of frameworks for applying a data-parallel dataflow to some large set of collections; let's try and step the game up a bit and consider sequences of input collections. That is, Rather than some enormous 1TB input collection, you have a whole bunch of these collections as input. We are eventually going to restrict our attention to situations where there isn't much variation from collection to collection, but it helps to start out generally, to make sure we know what we want to compute.

Because our operators and computation are functional, the sequence of input collections give rise to a sequence of output collections. We know how each of these output collections are defined: application of the functional form of the computation to each of the inputs. We could imagine re-running our computation (however we end up running it) on each of the input collections independently, which would probably be painfully expensive but at least painfully clear about what we should be producing as output. This isn't obviously the most promising way to compute the sequence of outputs, though.

This is normally the point in a paper where the authors run off in various different directions and explain how they are going to efficiently implement this computation by incrementalization. In short, there are lots and lots of ways to exploit the structure of the sequences of inputs: one input comes after another, and we could imagine re-using the result of preceding computations. For example, one could assume that records are only added to the input and operators are logically monotonic (more inputs means more outputs, never fewer); this gives rise to Datalog and CRDTs and stuff. Or, you could assume that the sequence of inputs are really a sliding window (when a record is added, it remains for some fixed number of rounds) and the computation is a DAG. Or a bunch of other things.

Now, this is all very fun, but it may be a bit premature. Central to differential dataflow is the observation that sequences may not always be the right way to think about how collections change.

Moving beyond sequences

Computations have cycles. Cycles are important to us, as programmers, because that it the only way to compute the thing we want to compute. Despite the prevalence of acyclic dataflow systems, loops really are pretty important in computer science. As an experiment, ask the nice folks who implement those acyclic dataflow systems how far they would have gotten without for or while loops.

Ok but, loops aren't mysterious or anything. Our iterate loop above goes through multiple iterations of computation; each iteration will start with some dists and produce a new dists. In the same way that we were talking about sequences of input collections, we could talk about sequences of dists as we go around the loop. So we are just back to sequences, right?

Actually it is a sequence of dists for each round. So, really it is more like a sequence of sequences. Weird.

Lets think about things slightly differently: inputs like roots and edges go through a sequence of collections, so let's have them be indexable by some integer i. The collections roots[i] and edges[i] indicate the collections in the ith element of the sequence. For folks inside the loop, dists for example, we'll let it be indexed by both round i and iteration j: dists[(i,j)] indicates the collection dists in the jth iteration of the loop for the computation stated with inputs roots[i] and edges[i].

With this framing, sequences are just collections that vary with an integer parameter. Inside a loop, we have collections that vary with a pair of integers. Hey let's just let collections vary, and not be so picky about what they vary with just yet.

We still want to hold on to our core principle, that operators map functional logic across varying inputs. If the inputs are allowed to vary with some parameter (integers, pairs of integers, strings?) then we can clearly define an output collection that varies with the same indices: for any index t, let

output[t] = logic(input[t])

It worked out great for sequences, right?

So, we can take something like dists which varies with (i,j) and think about

dists.join(&edges)
     .map(|(node, dist, next)| (next, dist + 1))
     .concat(&dists)
     .group(|node, ds| ds.min())

which is just another collection that also varies with (i,j). The collection is defined pointwise as (I just put a bunch of [(i,j)] in there):

dists[(i,j)].join(&edges[(i,j)])
     .map(|(node, dist, next)| (next, dist + 1))
     .concat(&dists[(i,j)])
     .group(|node, ds| ds.min())

Notice how we are talking about just one computation (the four operators above), but run in a variety of contexts on different inputs in different rounds of computation.

This is actually almost what we want in our example, if we could just take this output at (i,j) and feed it back to be dists[(i,j+1)].

Adding loops

Let's look at an example of what this weird and crazy approach lets us do. We've talked about how several collections in the example above vary, roots and edges with integers, and dists with pairs of integers, but how do we make them "start varying"? How do we actually get computation off the ground and producing something meaningful?

Informally, what we want to have happen is to start from a collections like roots and turn that in to a collection that is initially roots, but which then gets set to be the result of running the loop body. Then we run the loop again and update dists again. And again. We do this forever, and once we are done we return the result.

dists[(t,0)] = roots[t]
dists[(t,i)] = loop_body(dists[(t,i-1)]) for i > 0
result[t] = dists[(t,infinity)]

This looks a bit sneaky here, but the point is that if we can pull it off (a) we produce the result of iterating loop_body lots of times and (b) the operators inside loop_body don't need to do anything smarter than apply their logic to each instance of dists.

We've effected an iterative computation by putting a cyclic harness around helpfully general operators (ones that can tolerate an additional coordinate of variation). Importantly, we've done this with a largely leak-free abstraction: loop_body doesn't need to know it is being executed in a loop, and the loop doesn't need to restrict loop_body (which could totally have another loop inside it!). We just need the property that loop_body maps the same functional logic across each of its inputs.

Recap

Let's take a moment to reflect, and think about whether we've acutally done anything smart yet.

Mostly we've put together some notation. We have collection that vary with indices like i or (i,j), and we define new collections that vary with the same indices. Ideally, we've just named a bunch of things we would eventually like to compute. Also, we know that we could compute each of these,because each have a super clear functional definition in terms of inputs or other intermediate collections.

Remember, so far we were just trying to define what we want to compute.

Differential dataflow

We've dug ourselves into a bit of a hole, telling people that they could (and should!) write iterative data-parallel computations mapped across a sequence of terabyte-sized inputs, but not really having a great story about how do you do that. Our best story is: each time you need to do that, just start from scratch and spin up some scalable computation. In our defense, this is basically what thinks like DryadLINQ and Spark propose.

Now we are going to dig ourselves out.

Data and operator model

Our collections are going to be multisets over some domain. There is some set of possible elements, think "32 bit integers", or "strings", or "weird type you just defined (with some restrictions)" and for each of them there is a integer count indicating how many times the record occurs in the collection.

Each of our operators will be "data-parallel". This means that there is a function from input records to "keys", and the operator acts independently on parts of the input with different keys. Implementation-wise, this will mean that we can partition records by key and independently apply whatever operator logic we have. This is historically used to parallelize the computation, which we'll certainly do. We will also use it to efficiently incrementalize the computation.

Incrementalizing sequences

We are going to go beyond sequences, but it will help to start with how we might implement an incremental dataflow operator if we only ever have sequences of inputs. Just sequences for now, not beyond-sequences, yet.

If we have a sequence of collections, which are counts for each record, one natural approach would be to work with the differences between the collections: those records with different counts, and the differences in their counts. This way, even if we have terabytes of data, if they don't change too much as we move along the sequence, great! In practice, this means we stop worrying so much about how large the input data are, and instead how rapidly they change.

So let's imagine we only talk in terms of the differences between collections (and starting from a 0-th empty collection). What do we need to do to get operators to properly compute their sequence of outputs? Also, note that we only need to produce the sequence of output differences. We are now working with difference exclusively, and getting the actual outputs isn't on our list of desiderata any more.

Getting the output differences for data-parallel operators isn't so hard. We know that if a group of records with the same key hasn't changed, its output wont change either. Changes only result from the keys observed in the input differences. We could totally just collect the keys in the input differences, determine the state of the input each after and before the update, apply the operator's logic to each, and output their difference.

An example: distinct

Let's look at an intentionally primitive example. Imagine we have an operator whose goal in life is to take a multiset input and produce the multiset output with a count of one for input records with counts greater or equal to one. This operator is commonly called distinct, and it is nicely data-parallel (the key is the record itself).

Imagine we have a sequence of inputs

input_0 = { (cat, 1), (dog, 1), ... }
input_1 = { (cat, 2), (dog, 1), ... }
input_2 = { (cat, 2), (goat, 1), ... }

where ... is a terabyte of animal names that doesn't change in this example.

The corresponding outputs of the distinct operator are

output_0 = { (cat, 1), (dog, 1), ... }
output_1 = { (cat, 1), (dog, 1), ... }
output_2 = { (cat, 1), (goat, 1), ... }

If we write down the input differences, they are

d_input_0 = { (cat, 1), (dog, 1), ... }
d_input_1 = { (cat, 1) }
d_input_2 = { (dog, -1), (goat, 1) }

While we could form each of the outputs and determine the differences, this is expensive. Instead, the output differences for each input can be determined just from the input records in difference. In the very first round of differences everything changes (up from a count of zero), and we need to actually go and perform our distinct logic on it. In the second round only the count for cat changes, and we can just reconsider the distinct logic before and after the update; no changes required. In the third round the counts for dog and goat change, from one to zero and zero to one respectively; we reconsider their counts before and after and determine that the output differences should be:

d_output_0 = { (cat, 1), (dog, 1), ... }
d_output_1 = { }
d_output_2 = { (dog, -1), (goat, 1) }

The important point is that to determine this output, we don't need to pay any attention to the ... data in anything other than the first round.

This example also makes an interesting point that some times differences just vanish. In the second round ('1'th) there are no output differences, and we can just stop. Subsequent computations will have no input differences, and consequently no output differences.

Incrementalizing "beyond-sequences"

Ok, it is now time for grown-up pants.

Let's take everything we learned up above and apply it to beyond-sequences. Like, maybe sequences of sequences or strings or something. Grab some paper and try and figure out the differences for

input_(0,0) = { (cat, 1), (dog, 1), ... }
input_(1,0) = { (cat, 2), (dog, 1), ... }
input_(0,1) = { (cat, 1), (goat, 1), ... }
input_(1,1) = { (cat, 2), (goat, 1), ... }

Hard, isn't it? What the hell do you subtract, and from what?

Let's think about this slightly differently: we have some collection x that varies as a function of some index t. We want to cook up some differences dx that also varies as a function of t, and they should probably have some relation to x. How about: "the differences should add up to equal the collection"? Something like:

x[t] = sum_{t' <= t} dx[t']

Now, we did something sneaky here and just used a <= that maybe we aren't allowed to use. Actually, this is the big reveal, where we move from calling them "beyond sequences" and start calling them by their true names: "partial orders". If this were a film, we'd have a training montage where we learn about the powers of partial orders, but basically they have a <= function. It's transitive.

But, surely you just use <= on numbers, right? Nah, you can totally use it on other things. Like, pairs of numbers, or strings. Stuff like that. You just need to be able to define a partial order on the elements, which doesn't have to be a total order (like the integers); not every two x and y have to have one less than the other; that is the secret power of the partial order.

Hey what about that formula back up there?

Let's rewrite it just a bit. If we solve for dx[t] we get

dx[t] = x[t] - sum_{t' < t} dx[t']

which basically just says: "the difference at t should be whatever it takes to make x[t] add up right".

The only remaining degree of freedom, and the last thing to talk about before we start talking implementation, is "which partial order should we use?" Cause, the partial order you choose determines how you take the differences, and that is important.

Partial orders

You can choose just about any partial order you might like, which is a strength of differential dataflow. The math we've written above results in sane definitions for most any partial order (caveat: not a formal statement), so our choice of partial order is more about performance: how small we can make the differences.

We're introducing this generality mostly to deal with loops, and loops are pretty important. Let's talk about loops!

Recall from up above that when we introduce a new loop with iterate we add an integer to whatever index we were previously varying with. If indices used to look like t, now they look like (t,i). All we know about t is that it is partially ordered; it could be integers, or it could be more complicated.

We'll end up using the "product partial order", where

(t1,i1) <= (t2,i2) iff t1 <= t2 and i1 <= i2

Each coordinate has to be less-or-equal for the pair to be less or equal. This is pretty convenient, because the only thing we have to do with t is compare it to other ts using its associated <= function. But why is it a good choice? Is it a good choice?

The example up, with the product partial order, above gets differences that look like

input_(0,0) = { (cat, 1), (dog, 1), ... }
input_(1,0) = { (cat, 2), (dog, 1), ... }
input_(0,1) = { (cat, 1), (goat, 1), ... }
input_(1,1) = { (cat, 2), (goat, 1), ... }

d_input_(0,0) = { (cat, 1), (dog, 1), ... }
d_input_(1,0) = { (cat, 1) }
d_input_(0,1) = { (dog, -1), (goat, 1) }
d_input_(1,1) = { }

Understanding whether the product partial order is actually a good choice or not may be a bit of a non-technical question, I think. This choice leads to an interpretation of these differences as higher-order differencing: differences of differences. This leads us to some good feelings when we think about loops: If a change to the input doesn't change the sequence of differences in the loop much, we won't have large differences. This wouldn't be true for other orders, like the lexicographic partial order (order (t,i) first by t then by i).

We haven't yet nailed down a brilliant cost model for differential dataflow, but it seems like there is some potential here.

Operator implementations

Before we have a big party we should make sure that we can still work out how to efficiently implement our operators, in terms of this weird differencing regime. It turns out to be harder than above, but we can totally do it.

Imagine we keep all received input differences and produced output differences, partitioned up by their associated key. We can reconstruct the input and output associated with a given key at any t, subtract from the output the operator logic applied to the input, and produce that as output for t.

Ideally, we only need to pay attention to a restricted set of (key, t). When we were working with sequences we could just restrict our attention to the (key, t) pairs observed in the input. How do we know which keys and times to work with here? "All of them" would probably work, but at this point you might be worried that "the `(key, t) pairs observed in the input" doesn't work.

Interesting times

There is a small subtle detail here, which is that the times t at which the input part associated with key might have changed is not just those times at which we observe input differences with key. That sound a bit weird, but consider our distinct operator from up above with input differences

(cat, +1, (0, 3))
(cat, +1, (1, 2))

These indicate that cat gets incremented at both (0,3) and (1,2). No clue why, but that's what it says.

If we think about what the distinct operator would need to produce as output differences, assuming that the input doesn't have any other cat increments, it probably looks a bit like

(cat, +1, (0, 3))
(cat, +1, (1, 2))

Except, this goes horribly wrong at (1,3), when the two differences both come into effect. At (1,3) the output would now be (cat, 2) which is totally wrong. Fixing it is pretty easy, we just add a

(cat, -1, (1,3))

but what happened here?

Despite only seeing differences at (0,3) and (1,2), the input collection experiences a change at (1,3) when these two input differences both come into effect. It turns out that the set of times we need to consider is the set of times when the input might change. We could find these times by taking any subset of the input times and asking "when do they all first come into effect?"

For the mathematically inclined, this means that all of our "partial order" talk really should have been "join semi-lattice", which is a partial order where any finite subset of elements have a "least upper bound": an element greater than any member of the set, and less or equal to any elements greater than all elements in the set. We proved some things about this being correct and stuff.

As it turns out, it is sufficient to compare the input and output for (key, t) pairs where t is the least upper bound of some subset of times at which the input associated with key changed. You might worry that there is there is something scary exponential going on here, but (i) nah, you can't have that many without a more complicated partial order, and (ii) it's actually the right answer; the input changes and so the operator could do something totally different.

Pseudocode

So, no more surprises, I think. Let's write some timely pseudocode to explain how things work. The rough idea is to accept and buffer incoming differences, request notifications at each interesting (key, time) pair, and when that time happens swing through the keys we need to reconsider. Our main datastructure is a Trace<K, T, V>, which you should think of as

type Trace<K, T, V> = HashMap<K, Vec<T, Vec<(V, i32)>>>;

Roughly, it lets us map keys of type K to a list of pairs of time T and a list of the differences (V, i32) at that time. We will imagine some convenience methods on this datastructure which allow us to enumerate the times for a key, and reconstruct collections for keys at supplied times.

Let's see how something like a unary group operator could be implemented:

// `stream` contains records of type `((K, V), i32)`: keys, vals, and updates.
stream.unary_notify(exch, "Group", vec![], move |input, output, notificator| {

	// read inputs, queue up (key, time) pairs.
	while let Some((time, data)) = input.next() {
		for ((key, val), wgt) in data {

			// add the difference to the trace
			trace1[key][time].push((val, wgt));

			// enumerate times to add least upper bounds
			for t2 in trace1[key].times() {
				let least_upper_bound = time.lub(t2);
				keys_todo[least_upper_bound].insert(key);
				notificator.notify_at(least_upper_bound);
			}
		}
	}

	// we've asked to be notified at some times
	while let Some((time, count)) = notificator.next() {

		// these keys need to be double-checked to ensure
		// output[key]@time = logic(input[key]@time)
		for key in keys_todo.remove(time) {

			// reconstruct inputs and outputs
			let source = trace1[key].reconstruct_at(time);
			let result = trace2[key].reconstruct_at(time);

			// determine what differences we require,
			// send them as output and commit them to trace2
			let diff = logic(source) - output;
			for (val, wgt) in diff {
				output.send_at(time, ((key,val),wgt));
				trace2[key][time].push((val, wgt));
			}
		}
	}
}

I don't want to pretend that was easy, but it really isn't all that horrible. You can write a binary version of this, with two inputs, and you are able to write just about any differential dataflow operator. We have a few optimized implementations for linear and bilinear operators, which don't require this full generality.

Now you know all about how differential works! Or, almost everything.

Loops (iterate)

Although we've seen all you really need to know about how differential operators work, we'll also need to explain how you actually stitch together a dataflow graph so that it computes the thing that you want. For acyclic graphs this is pretty easy: you just put the operators together in the order you want. But if you want to use the iterate operator, it is a bit different.

Recall from a while back that when we talked about iterate we had rules like

dists[(t,0)] = roots[t]
dists[(t,i)] = loop_body(dists[(t,i-1)]) for i > 0
result[t] = dists[(t,infinity)]

Once we actually get a legit stream of differences, dists up above, it is pretty easy to dangle a differential computation off of it. We have to pay a bit more attention to how we get dists when we only have roots to start with.

Doing this requires a few custom operators. As part of this, we'll also clean up the "run the loop forever" nonsense.

  1. We'll want an operator enter that takes roots to a collection where for all (t,i) we have

     enter(roots)[(t,i)] = roots[t]
    

    Because of the magic of the product partial order, we can just use timely's enter operator which takes inputs at t and produces them at (t,0). No fancy computation required, because of theorems and stuff.

  2. We'll want an operator feedback that takes a collection x to one iteration later, meaning

     feedback(collection)[(t,i+1)] = collection[(t,i)]
    

    Timely's loop_variable method does this, by letting you use a stream before defining it. Once defined, the contents have their loop coordinate incremented by one.

  3. We'll want an operator leave that extricates the collection in the last iteration from the loop.

      leave(collection)[t] = collection[(t, infinity)]
    

    If we want to be more serious, we could replace infinity with u64::max_value() or whatever we used as the maximum iterations.

    This seems like it might be a total pain to figure out, but it's acually really easy. Another way to think of "the collection in the last iteration" is "the accumulation of all the differences". We just need to output all of the differences, and they will add up to the final value of the collection. Timely's leave operator does the right thing for us.

So we just use timely to get these special operators. How fortunate!

To actually cause a loop to happen correctly, according to the rules up above, we will use these three operators and some random "addition" and "subtraction" operators, which are just forms of concat and map, and the logic of the loop body. The actual source for iterate isn't very complicated, so I thought I would just put it here.

// actual code for `iterate`
self.scope().scoped(|subgraph| {

	// define a new loop variable, to feed into `logic`
    let (feedback, cycle) = subgraph.loop_variable(u64::max(), 1);
    let ingress = subgraph.enter(&self);

    // this is our output and what we feed back
    let bottom = logic(&ingress.concat(&cycle));

    // subtract the input and feed back to the loop head
    bottom.concat(&ingress.map_in_place(|x| x.1 = -x.1))
          .connect_loop(feedback);

    // return the results
    bottom.leave()
})

Wrap-up

Now you actually know enough to go and implement a simple version of differential dataflow. There are some interesting optimizations and implementation details to discuss, but for now let's take a bit of a rest. I'd like to work on polishing this text to try and better explain things, so do send any comments or pull requests along! Any thoughts on a better way of explaining things, or maybe a twitch Javascript implementation that lets you play around with inputs and outputs and see what differences look like at various points?