Go with the dataflow

Dataflow programming is a style that describes your data processing as data flowing through a graph of operations, in contrast to the usual imperative processing style. It invites you to think of data operations as a collection of black boxes, linked together by a runtime which takes care of moving data from one box to the next.

This may sound a little bit like LINQ, which makes it easy to compose set-based operations such as filtering and mapping without worrying about how the data gets from one operator to the next. You might even be reminded of the PowerShell pipeline, which pipes objects between composable commandlets. However, dataflow goes beyond a sequence of operations, allowing for flows to branch and join as required. Dataflow is also typically asynchronous, allowing for data to arrive and begin processing at arbitrary times — in this it’s more like the Reactive Extensions than LINQ.

In .NET 4.5, you can do dataflow programming using the new System.Threading.Tasks.Dataflow DLL. Here’s a quick look.

Dataflow ‘Hello, world’

If there’s one thing everybody hates more than a Fibonacci program, it’s a ‘Hello, world’ program, right? Well, tough. Here it comes.

// Set up the dataflow
var helloifier = new TransformBlock<string, string>(s => "Hello, " + s);
var printer = new ActionBlock<string>(s => Console.WriteLine(s));
helloifier.LinkTo(printer);
 
// Send some data into the dataflow
helloifier.Post("world");
 
Console.WriteLine("Prepare to be wowed");

This program sets up an almost trivial dataflow, consisting of just two blocks, a transform block and an action block. The transform block receives input, performs a transformation on it, and passes the result to its output. It’s the dataflow equivalent of the LINQ Select operator. In this case the transformation is to stick “Hello” on the front of the input. The action block receives input, does something with it, and doesn’t produce any output. In this case the thing it does is print it to the console. The program links the output of the transform block to the input of the action block.

Next, the program posts some data into the dataflow — specifically into the transform block, since that’s where our flow begins. Post is asynchronous, so it returns immediately, allowing the next statement to run. (You can also use SendAsync, which returns a Task object you can await on to know when the block consumed your input — this can be handy if you’re throwing a lot of data at a block.) Meanwhile the the dataflow kicks off in the background. First the transform block runs, producing the output “Hello, world”; then this output is passed via the link to the action block, which dumps this to the console. So the output is:

Prepare to be wowed
Hello, world

Notice that the Console.WriteLine in the main program ran before the one in the action block.

Dataflow is parallel

Obviously, dataflow isn’t terribly exciting for one message with trivial processing. Things start to hot up though when you have multiple data items or heavier processing needs. In this case dataflow will automatically parallelise the processing. You can control this using the ExecutionDataflowBlockOptions.MaxDegreeOfParallelism setting. In this example, I simulate CPU-bound operations in my transform and action blocks:

static void Busy() {
  for (int i = 0; i < 1000000000; ++i) { }
}
 
var bustagut = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
 
var helloifier = new TransformBlock<string, string>(
  s => { Busy(); return "Hello, " + s; },
  bustagut);
var printer = new ActionBlock<string>(
  s => { Busy(); Console.WriteLine(s); },
  bustagut);

When I pump a dozen or so messages into this on a quad-core machine, Task Manager shows me getting full value for money out of my CPU!

Dataflow is push-friendly

You’ll have noticed that I initiated the dataflow by calling Post (or SendAsync) to submit a data item into a block. This is different from LINQ’s “pull” model. In LINQ, data processing is initiated when I ask for a result. This means dataflow is well suited to applications where data items are coming in unpredictably — for example, handling items coming in from a network or message queue — or even, heaven forbid, a user interface.

TPL dataflow integrates with Reactive Extensions and other observable APIs via the AsObserver and AsObservable extension methods.

Dataflow is messaging

So far this doesn’t seem to do anything more than LINQ (or, at any rate, Parallel LINQ… or at any rate, some sort of bizarre crossbreed of Parallel LINQ and Reactive Extensions… sorry, I’ll come in again). Let’s build it up into more of a messaging system by adding in a broadcast block. Broadcast takes an input and sends it to potentially multiple outputs. So now we can take each data item and perform multiple operations on it in parallel — something not easily possible in LINQ.

var splitter = new BroadcastBlock<string>(s => s);
var helloifier = new TransformBlock<string, string>(s => "Hello, " + s);
var printer = new ActionBlock<string>(s => Console.WriteLine(s));
var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var emailer = new ActionBlock<string>(s => EmailToBoss(s));
 
splitter.LinkTo(helloifier);
splitter.LinkTo(businessifier);
helloifier.LinkTo(printer);
businessifier.LinkTo(emailer);

Now each input is both printed to the console and emailed. A simple use case for this might be a logging system where messages are stored locally in a CSV file and also uploaded to a Web service.

And once again, the two branches of the dataflow run in parallel. Each block processes messages independently, so the various transformations and actions can all go at their own pace.

Chunky versus chatty

So far I’ve considered data items in isolation. But when processing involves accessing a remote or expensive resource, processing each data item as it comes in may result in inefficient, chatty communication. It may be more better to batch up a number of data items and submit them together. No problem — just reach for the batch block.

var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var batcher = new BatchBlock<string>(5);
var emailer = new ActionBlock<string[]>(s => EmailToBoss(s));  // Action now takes array of string
// ...
businessifier.LinkTo(batcher);
batcher.LinkTo(emailer);

Now the email method will be called only when there are five pending data items waiting to be processed, and all five will be passed to the email method in one go.

One complication with this is that a batch block doesn’t know when the stream of data has dried up and therefore won’t automatically flush an incomplete batch. For this to work we need to do two things: first, we need to tell the dataflow network when we have no more input; and second, we need to propagate this through the network to the batch block. To do the first, call the Complete() method on the input block; to do the second, specify a DataflowLinkOptions with PropagateCompletion = true on each link:

businessifier.LinkTo(batcher, new DataflowLinkOptions { PropagateCompletion = true });
batcher.LinkTo(emailer, new DataflowLinkOptions { PropagateCompletion = true });
 
for (int i = 0; i < 13; ++i) {
  splitter.SendAsync("world " + i);
}
splitter.Complete();  // no more data to come

Another approach could be to have a watchdog that periodically calls BatchBlock.TriggerBatch — this would also ensure that batches are delivered in a timely way even if the rate of data items is very low.

Add some dynamism to your dataflow

An interesting feature of dataflow compared to LINQ is that dataflow blocks can be linked and unlinked at run time. The LinkTo method returns an IDisposable which, when disposed, deletes the link. This means you can dynamically update your processing flow at run time depending on resource availability, pricing, varying output requirements, etc. You can even imagine giving users a simple diagramming tool for building their own analysis dataflows using transformation, aggregation and filtering blocks.

Conclusion

The examples of dataflow I’ve shown here are very trivial. But hopefully you can see that it provides a flexible way of processing asynchronous messages, based on messaging principles rather than operating directly on sequences and scaling nicely to multiple cores. TPL dataflow is part of .NET 4.5 so if you want to have a play with it just grab the Visual Studio 11 beta and give it a go!

Tagged as General

2 Responses to “Go with the dataflow”

  • i like to think of this as assembly line programming – data moves down the conveyor belt and a bunch of $1 a day workers keep bolting stuff to it.

  • The first article i read on this topic which clearly introduce Dataflow;

  • Leave a Reply

Archives

Join our mailer

You should join our newsletter! Sent monthly:

Back to Top