## Differential dataflow status report

| |
[ 所属分类 数据库（综合） | 发布者 店小二05 | 时间 2017 | 作者 红领巾 ] 0人收藏点击收藏
Differential dataflow progress report

Back in July, I wrote a post about a differential dataflow roadmap : where I would like thing to go next based on current issues, and what sorts of problems we should expect to see along the way. Being diligent soul that I am, I then proceeded to hop on a plane, visit some friends, see Ireland a bit, and go surfing for a few months. Sorry about that. But I'm back now, I've been working a bit, and I thought I'd tell you about some of the progress.

Recap

The differential dataflow roadmap post is definitely the right place to start if you'd like to follow along, but let me start with an explanation of the problems as I saw them.

The previous post starts with the example of graph reachability, where a collection of reachable node identifiers iteratively expands through repeated join-ing with the collection of graph edges. We are going to use a similar but different running example in this post: breadth-first search .

The following fragment computes, from a set roots of source node identifiers and a set edges of graph edges, the minimal distance from any root to each of the reachable graph nodes:

// initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); // repeatedly update minimal distances to each node nodes.iterate(|inner| { // bring loop-static collections in scope. let edges = edges.enter(&inner.scope()); let nodes = nodes.enter(&inner.scope()); // edges propose dist + 1 to edge destination. inner.join_map(&edges, |_k,l,d| (*d, l+1)) .concat(&nodes) .group(|_, s, t| t.push((s[0].0, 1))) // <-- "min" })

This computation iteratively expands the set of reachable nodes, but it also accepts arbitrary changes to the roots and edges collections. Part of the magic of differential dataflow is that it keeps all of this sane, and reports changes to the correct answers as roots and edges change. The main technique making this possible is to explicitly transcribe how and when collections change, using logical timestamps that differentiate between rounds of input and iterations. These changes are recorded in something like a log, with an index maintained over it so that our operator implementations can quickly access and assess the state.

This log-structured state is great for sharing information across computations, and the re-use of prior changes lies at the heart of what makes differential dataflow fast. At the same time, all of this logging can be exhausting, and comes with potential performance issues. Differential dataflow specifically has at least two big issues:

Issue 1 : Degradation in latency.

As the computation proceeds, the logs get bigger and bigger. Although we keep an index over them to know where to look for each piece of data, eventually every piece of data has a long history, all of which we must consult to determine the data's current state. This is fundamentally more complicated than in only-streaming or only-iterative computations, where there is always a "currentmost" version of the data that we can maintain. Differential dataflow strongly relies on the ability to pick up seemingly (but not actually) arbitrary subsets of prior differences.

Let's look at breadth-first search run on two different graph sizes (1k nodes and 1m nodes), with as many single-element updates as we can be bothered to run. We are using the existing implementation, to demonstrate the issues. To spoil it, the issues are that as we increase the number of updates, the performance degrades.

This figure plots the distributions of 1,000 update latencies after various orders of magnitudes of updates to a graph with 1k nodes and 2k edges. We only do 100,000 updates because things get unbearably slow.

This figure plots the distributions of 1,000 update latencies after various orders of magnitudes of updates to a graph with 1m nodes and 10m edges. We only do 10,000,000 updates because things get unbearably slow. We use 10m edges rather than 2m, because due to randomness the roots weren't actually connected to the graph for the first 150,000 updates. Oops.

Both of these experiments demonstrate that as our computations run, the distribution of latencies degenerates. This makes sense, as our current implementation maintains an append-only log as the internal state, and operators need to traverse more and more data. Computations with more keys can absorb more updates, as the updates are distributed across the keys and it takes longer for the updates to become an issue, but they eventually degenerate nonetheless.

Issue 2 : Poor scaling with small updates.

Ideally we would like our computation to run faster with more resources (processors, memory, networking). However, differential dataflow's reliance on differencing from prior computations means that it (currently) coordinates among workers for each time that it sees. This can be highly inefficient if there is only a single difference for each time. Several other aspects of differential dataflow's operator design assume that non-trivial per-time work will be safely amortized over the total work to do. The result is that small updates have relatively low throughput, which does not improve with additional workers (and indeed, it can get worse).

There are some measurements in the linked post; we won't reproduce them here because we aren't going to address this problem in this post. There is some code written though, and I anticipate a future post talking about this.

To go along with these issues, I put some constraints in place. These constraints are aspirational, in that we don't really need to have them, but they make so much sense that we should be worried if our design precludes them.

Constraint 1 : Compact representation in memory

We could index our data seven different ways, with all sorts of exotic tables and linked datastructures and such. At the same time, we know that we can describe a graph in memory using about 4 bytes for each node and each edge. We would like to avoid straying too far from these tight representations when we can.

Constraint 2 : Shared use of indices

It is not uncommon to see the same collections partitioned multiple times using the same key, or the output of a data-parallel aggregation then be joined on the same key. In these cases, we could hope to re-use the same index multiple times. This complicates in-place updates, and updates generally.

Executive Summary

There is a tasty new implementation of differential dataflow's core state management data structure, the Trace , which I think takes several issue more seriously than the prior implementation did. The new implementation is not "finished", but it has several features that we can evaluate now.

The main feature is that the new implementation can compact state in a sane (correct, efficient) way as the computation proceeds. This allows the update latencies to stabilize, giving distributions of update latencies corresponding to two the experiments just above

This figure plots the distributions of 1,000 update latencies after various orders of magnitudes of updates to a graph with 1k nodes and 2k edges. The line with the blue halo represents the 1,000 updates after one million single-element updates.

This figure plots the distributions of 1,000 update latencies after various orders of magnitudes of updates to a graph with 1m nodes and 10m edges. The line with the blue halor represents the 1,000 updates after ten million single-element updates.

These two measurements indicate that the distributions of latencies seems to stabilize. For the larger graph there is some rightward drift, though this is due to the stabilization only occurring once we have processed about as many changes as we have initial data (details should be clearer once we explain the implementation).

To compare with the existing implementation, we've plotted the distributions after 100,000 and 10,000,000 updates, for the respective experiments:

This figure plots the distributions of 1,000 update latencies after 100,000 updates to a graph with 1k nodes and 2k edges. The solid line is the new implementation and the dashed line is the previous implementation.

This figure plots the distributions of 1,000 update latencies after 100,000 updates to a graph with 1m nodes and 10m edges. The solid line is the new implementation and the dashed line is the previous implementation.

So it seems our new implementation is doing something right, at least with respect to large numbers of updates. There is some cost; the elapsed times to process no update breadth-first search increases, from 28.51s to 74.83s for a random graph with 10m nodes and 100m edges. This is not so bad, in that the new implementation doesn't exploit dense node identifiers (using arrays rather than HashMap s); the prior implementation without this optimization runs in 57.14s. There are several optimizations not yet done, so I'm currently pretty comfortable with this performance regression in the name of robustness.

The new implementation also has a larger memory footprint (we will explain why) for some not-uncommon cases, like one-off graph processing, but has a reduced memory footprint for computations with large values or many distinct logical timestamps.

The implementation is checked in to the resolution branch (for "high resolution times). It seems to work, but isn't pretty enough to read that I can recommend it unless you are very keen.

New data structures

The main piece of work has been to re-vamp the datastructure differential dataflow uses to store the history of a collection. A collection is a multiset of (key, val) pairs which is allowed to change arbitrarily according to some partially ordered logical times. Differential dataflow decribes collections by a set of update tuples of the form

(key, val, time, diff)

each of which indicates a change diff to the count of the tuple (key, val) at the logical moment time .

As we perform computation, operators typically focus in on some key at some time , accumulating those differences whose time is less-or-equal to time to determine the contents of the collection (the counts for each val ) at time .

count(key, val) @ time = sum_{time' <= time} diff : (key, val, time', diff)

This sort of accumulation happens often, as we repeatedly work to figure out what values actually exist at various times and then show them to your user logic.

The previous datastructure indexed tuples first by key (to restrict our attention), then by time (to efficiently filter out differences), and then by val (to allow efficient merging of differences). This works well when there were few times, as we only need to merge a few ordered sequences of values. But as we get more and more times, either because the computation keeps running or because the times are high-resolution, the time -indexed data looks more and more like an unsorted list of (time, val, diff) tuples. The clever "merging" of values devolves into placing all these differences into a heap. That's not very helpful, or efficient.

The new approach, foreshadowed in the roadmap post , is to index the data first by key , then val , and just leave things unorded by time. The idea is to keep something that looks a bit like:

struct Layer<K, V, T> { keys: Vec<(K, usize)>, // key and offset into self.values values: Vec<(V, usize)>, // val and offset into self.histories histories: Vec<(T, isize)>, // bunch of times and deltas }

where the usize fields indicate where in the next array down the corresponding values or times can be found. To reconstruct the count for a (key, val) pair at a time, we navigate down to the slice of history associated with the key and value, and then swing through it accumulating differences whose time is less or equal to our target time.

Is this representation any better than indexing by time? It isn't always; when we have few times we could be better off indexing by time and merging. However, when that assumption breaks down, this approach is substantially better (as it introduces no merging).

Another consideration is the redundancy of values and times. In the prior approach, we might repeat values if they change at multiple times; in the new approach we repeat times if there are multiple changes at the time. The sizes of values and times determine how much these redundancies cost; as values can be arbitrarily sized, and we imagine times have a small-ish fixed size (a few integers), I'm currently more comfortable with redundancy in times.

Dynamics

The Layer may be a fine representation of a static collection of tuples, but we are constantly adding updates to this collection. To deal with this, our trace will actually keep a list of these layers, of geometrically decreasing size. As we insert layers, we may need to do some merging to maintain the "geometrically decreasing" property, but so doing keeps at most a logarithmic number of layers to merge when we reconstruct collections.

struct Trace<K, V, T> { // layers of geometrically decreasing size layers: Vec<Layer<K, V, T>> }

When we insert a new layer, we should (i) merge all layers smaller than the new layer, (ii) append the new layer, and (iii) merge layers so long as two are within a factor of two of each other. Merging two layers is logically pretty easy, as each of the layers are in sorted order. It took me a surprisingly large amount of code (170 lines), so perhaps I've done something badly (update: yes, I have; repeatedly).

There are some aspects of the design that I find delightful. These aren't innovations you've never seen before, but I'm happy to have them nonetheless. Let's talk through them!

Reference counting layers

Rather than have each trace own its layers, as long as we aren't planning on changing the layer (we aren't) we can wrap each layer in a reference count and share it among multiple owners.

struct Trace<K, V, T> { // layers of geometrically decreasing size layers: Vec<Rc<Layer<K, V, T>>> }

Why is this helpful? This means that each Trace is just a spine of pointers to some common immutable data, and we can share this immutable data with others. These "others" might be other operators, or iterators, or channels. The first one might make sense, as we have mentioned our plan of sharing immutable state between operators, but what do the others mean?

IteratorsEach differential dataflow operator works through a trace by grabbing a cursor, which it then uses to surf around through keys and get work done, and produce output. If the trace owns its data, that work needs to finish up before we return from the operator, as the cursor is only borrowing the layers of the trace. This can be a big problem if we have 100GB of output updates to produce and send, as these updates will languish in buffers before we get a chance to run the operators that consume them.

By putting the layers behind reference-counted pointers, the cursor no longer borrows the state, need not return it, and can be drained at whatever rate we find appropriate. This allows us to interrupt the production of data, return control to timely dataflow and to operators that can consume some of the data we've produced. This keeps our memory footprint smaller, and can dramatically improve both throughput and latency.

ChannelsDifferential dataflow shares indexed state between operators through an arrange operator. The arrange operator takes differences as input, indexes them, and produces them as output. Consumers have access to the shared state, but still need to see the stream of differences to know what to respond to. We might think that those differences should be somewhere in the shared state, and they are, but it can be hard to find them if by the time you look they've already been merged with other layers. Currently, the operators just get the raw stream of differences and each need to re-organize these tuples.

Fortunately, the layers are exactly the differences the operators want to see, and rather than send the volume of differences down a channel, and re-organize them at each recipient, the arrange operator can (or should be able to) just send the reference-counted layer. Importantly, this only works because the arrange operator has already partitioned the differences across workers, and doesn't need to re-partition its outgoing stream of differences. And technically it doesn't work at the moment, because Rust and timely conspire to prevent you from sending thread-unsafe Rc types down channels, even though we "know" it will go to the same thread. To be fixed. Or cheated.

We will see a few other uses of reference counted layers in the next sections!

Compacting logical times

As the computation proceeds, some logical times become indistinguishable. After running for an hour, do we really care about the difference between the first and second round of input data? If we are doing a one-off iterative computation, do we care about what changed in iteration three vs iteration four? Probably not; we just want to know what the state looks like now . This is treated a bit more formally in the roadmap's section on compaction .

As timely dataflow computations proceed, timely informs the operators which times their inputs will never produce again. This information allows us to determine when two times, t1 and t2 , are indistinguishable to all future times:

t1 ~ t2 whenever t1 <= t' iff t2 <= t' forall { t' in future }

If no future time t' can tell t1 from t2 , and as a partial order the only way they interact is via the <= operator, then we no longer need to distinguish between them. This can be great, because we can now coalesce multiple differences at previously distinct times and occasionally cancel values and even keys out of the trace completely.

The logic to perform this compaction is simple enough: if the set of future times is bounded below by some set of times F , meaning for each future time t' there is some element f in F where f <= t' , then

advance(t, F) = meet_{f in F} join(t, f)

partitions all t into equivalences classes under the ~ relation above. We can replace each time t with advance(t, F) and we have an equivalent trace (with respect to a future F ), with likely fewer distinct times.

Operationally, each Trace also has a field

// lower bound on future times. frontier: Vec<T>,

describing the current set F . We leave all layers as they are, because they are immutable and possibly shared (and Rust won't let us), but whenever we merge two layers we advance all times as part of the merge. Compaction happens as a byproduct of merging, and as long as we keep adding data to the trace we will keep compacting it. This does mean compaction is more about preventing a trace from growing without bound, rather than recovering resources when we stop using it.

Progressive merging

When you merge layers, especially geometrically sized layers, sometimes it takes quite a while. Some of these layers are really quite large. This doesn't happen very often, but it is still pretty annoying when it does happen. The standard response to this is to do "progressive merging", where you start the merge process as soon as you can, and every so often you do a bit of work on the merge. If you do things right, each merge finishes before that layer needs to be merged again.

Instead of keeping a list of Rc<Layer<K,V,T>> , lets describe each layer as either "ready to go", or as a merge in progress:

pub enum LayerMerge<Key: Ord, Val: Ord, Time: Lattice+Ord> { Finished(Rc<Layer<Key, Val, Time>>), Merging(Rc<Layer<Key, Val, Time>>, Rc<Layer<Key, Val, Time>>, Merge<Key, Val, Time>), }

Here we either have a single Rc<Layer<_>> , in the Finished state, or a merge in progress, with the two source layers and however we plan on representing a merge (the Merge struct). Perhaps, two cursors for each reference-counted layer, and a layer we are currently populating:

pub struct Merge<Key: Ord, Val: Ord, Time: Lattice+Ord> { cursor1: LayerCursor<Key, Val, Time>>, cursor2: LayerCursor<Key, Val, Time>>, result: Layer<Key, Val, Time>, }

Now whenever we would have performed a merge in the past we just start the merge, but continue to use the source layers until the merge completes.

To make sure we are done before we need to do another merge, whenever we add tuples to the trace we should merge at least that many tuples; because the sizes of the layers are decreasing, and because when we start a merge there are no smaller layers, this ensures that by the time we might need to merge again we have finished the work we need to do.

We might update our Trace struct's insert method to start like so:

/// Inserts a new layer, merging to maintain few layers. pub fn insert(&mut self, layer: Rc<Layer<Key, Val, Time>>) { // advance each in-progress merge let units = layer.len(); for layer in self.layers.iter_mut() { layer.do_work(units); } // .. continue with insertion logic .. }

where do_work(units) merges enough keys to have processed at least units tuples, and does nothing for the Finished variant of the enum.

This figure plots the distributions of 1,000 update latencies to a degree-count computation. The dotted line merges layers immediately, whereas the other curves use progressive merging. The solid line does as little work as is needed to finish a merge in time, and the other two curve work twice and four times as quickly. The more solid lines do more work over all, but bring the tail latencies down.

This figure zooms in on the largest 100 of the 1,000 update latencies to a degree-count computation. The dotted line merges layers immediately, whereas the other curves use progressive merging. The solid line does as little work as is needed to finish a merge in time, and the other two curve work twice and four times as quickly. The more solid lines do more work over all, but bring the tail latencies down.

As we see in the figures, the down-side of progressive merging is that while merges occur we use more layers than if we had immediately merged. This means that operators will be sniffing through more layers when they look up state associated with keys and values, which costs. If we tweak some constants to merge more aggressively we reduce the overhead while starting to allow tail latencies to re-emerge. The nearly dotted line, which does work at four times the rate it needs to, adds a 19% throughput penalty but reduces the maximum latency (in these measurements) by a factor of 5.5x. Maybe worth?

On the positive side, our reference counted layers are really paying off here. The trace is totally usable as the merges happen, operators can mint new owned cursors from the shared index state, and these cursors stay valid even if the trace starts a merge. No weird contracts about which memory is valid when containing what, that you (read: I) invariable screw up leading to great mysteries. This stuff was all bug-free as soon as it ran, once I fixed a copy/paste bug in the merge logic (Rust tries hard, but fundamentally can't prevent stupid people from using it badly).

Index building

Note: I haven't figured out the right way to do this yet.

Our layers store keys in sorted lists, and if an operator wants to seek to the data associated with a key, it needs to perform something akin to a binary search in each layer. There are some smarts here: we keep the cursor into the layers sorted by key, so that the operator only needs to search forward in those layers whose key is less or equal to the target, and we use exponential-probing search rather than binary search, but we still spend about half the time in bfs looking for keys.

One remedy for this is to keep an index, something like a HashMap<Key,_> , which points us at the data for each key. We could use a Vec<(usize,usize)> for each key, but this is horrible and we can do better. To maintain a multi-map from Key to some Loc representation of locations (e.g. an index for the layer and an index for the offset therein), we can fake out a linked list in one allocation:

struct Cache<Key, Loc> { map: HashMap<Key, usize>, links: Vec<(Loc, usize)>, } Here self.map[key] indexes into self.links to indicate the most recent location. Each entry in self.links has a location, and then index in self.links of the next-most-recent location. Indices of usize::max_value() are taken to mean "stop". As we add and merge layers in a restricted fashion, always working with the smallest and most recent layers, maintaining this cache just means popping and pushing suffixes of self.links and updating self.map for each added or merged key.

The prior implementation does something like this, and it works well. We also can (and do) allow other indices than HashMap , most notably just a Vec<Loc> when the Key type are dense unsigned integers.

I am keen to try this out, but it fights a bit with progressive merging. Progressive merging changes "where keys are" at somewhat unpredictable times, and seems to make life somewhat difficult for progressively maintaining the index. Alternately, we could just adopt this at which point progressive merging just has an even higher throughput cost for less of a tail latency win. Perhaps with more thinking they could work together.

Specialization

Note: This isn't done yet either, but seems easy to hack in.

Our first constraint is "compact representation in memory", and our current trace design is not compact. To represent something like the graph bfs uses, consisting of lots of (u32, u32) pairs each with (u32, u32) timestamps, the graph requires

#nodes * size_of::<(u32, usize)>() #edges * (size_of::<(u32, usize)>() + size_of::<((u32, u32), isize)>()

which is 16 bytes for each node and 32 bytes for each edge. This is about 8x larger than how we know we could represent the graph, when we write it as an adjacency list (just 4 bytes for each edge, if we use u32 s).

Our intuition tells us that a great deal of the data above is not useful, especially in the case of bfs . At any point in time, almost all of the graph edges are simply "present"; changes to the graph are brief and we quickly move past the moment when they might apply to one where they always apply. Almost all times are equivalent to the (0, 0) time, and almost all isize "changes" are simply +1 .

In fact, we can make a more general claim: differential dataflow computations that work on non-negative collections always have the property that the accumulation of differences equivalent to the "least" time are non-negative. These differences are exactly those you would accumulate to get the collection at the compacted time advance(t, F) and this collection needs to be non-negative.

Operationally, this means that if our trace pulls out all differences at times equivalent to the least time, we can throw away the time (they are all equivalent) and the isize diff (as they all accumulate to something positive, we just use the multiplicity of the record). We end up using an amount of memory like so,

#nodes * size_of::<(Key, usize)>() #edges * size_of::<Val>()

where #nodes and #edges are just those in the collection at time advance(t, F) . However, I expect that this is often the bulk for many collections, and if not it doesn't seem especially harmful to have the option and not use it.

We could extend this specialization in principle: we could pick any other popular time and put all updates equivalent to that time in a special layer. As all updates have the same time, we could replace the usize offset in self.vals with the isize differences from self.times , as with a single time there is only a single difference, and just ditch the self.times vector entirely.

Clear semantics

In differential dataflow, the output differences for each operator are a deterministic function of the input. For any given input sequence to the computation, for each collection in the computation, there are a well-defined set of differences at each logical time. As timely dataflow informs differential dataflow operators that certain times are "complete", all updates before these times become set in stone.

Timely dataflow indicates progress by advancing a "frontier" for each input stream. A frontier is a set of partially ordered times whose elements are mutually incomparable. Each differential dataflow operator sees a sequence of these frontiers, and between each can commit any updates whose times are no longer in the future of the frontier. If we bundle exactly these updates into a layer, then the prior and current frontiers describe the lower and upper envelope of times contained in the layer, as well as guarantee that no update within between the two exist in other layers.

pub struct Description<Time: Lattice+Ord> { lower: Vec<Time>, // lower frontier of contained updates. upper: Vec<Time>, // upper frontier of contained updates. }

As we only happen to merge layers corresponding to contiguous time "intervals" (i.e. layer1.upper == layer2.lower ), when we merge the layers we can also merge their descriptions (adopting layer1.lower and layer2.upper ). Each layer contains exactly those updates with times between its lower and upper bounds, and no others.

Technically, because some layers may be the result of compaction, we will need to add another field indicating the frontier at the point of compaction, so that users understand that the times in the layer are only valid when compared with times in the future of the frontier. I just added a field:

since: Vec<Time>, // frontier used for update compaction.

Descriptions allow us to thin out the layers operators need to search when looking for keys at a given time (or interval of times). For example, a layer corresponding only to times between (3, 4) and (5, 6) , read "input rounds 3 to 5, iterations 4 to 6", can be ignored for all keys when we are working at times between (7, 2) and (9, 3) . This is analogous to the filtering benefits of the time-indexed layer representation, but without the mess when we have fine-grained keys (I hope).

These descriptions are also handy for several tasks outside the execution of operators. For example, by serializing these layers, we have the beginnings of a relatively simple fault-tolerance story: layers found intact can simply be mapped into memory (courtesy: Abomonation , and updates for times in existing layers can be discarded rather than re-processed. Serialized layers also leave behind enough information for interactive exploration of collections, either for post-hoc debugging, or looking at live collections as they evolve.

Next Steps

There are still lots of cool things to do with differential dataflow's internals.

We still have a few of the features mentioned above to play with. We can try out index building and see if we recover all of the performance of the prior implementation. I really want to try out the specialization and see if I can get the twitter_rv graph up and running.

The prior implementation happily shares data across multiple operators. That seems pretty easy to do here too, but there is a bit of surgery required to get everything back in place. We also need to roll our own smart reference counter, which tracks the frontiers of each of the users of the trace and derive lower bounds for compaction.

Our next major step is working through implementations of group and join for high-resolution timestamps. There is an implementation, commented out, in the current codebased, and it seems like this is all ready to try out and see what breaks next. The restructuring of state management was a crucial first step to support the large number of timestamps, though.

I'm hoping to write about each of these as they happen, and do a bit of measurement to see how things improve, or what sort of regression we eat in the name of scientific progress.

tags: lt,times,time,layers,gt

1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责；
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性，不作出任何保证或承若；
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。