Go with the dataflow

Dataflow programming is a style that describes your data processing as data flowing through a graph of operations, in contrast to the usual imperative processing style. It invites you to think of data operations as a collection of black boxes, linked together by a runtime which takes care of moving data from one box to the next.

This may sound a little bit like LINQ, which makes it easy to compose set-based operations such as filtering and mapping without worrying about how the data gets from one operator to the next. You might even be reminded of the PowerShell pipeline, which pipes objects between composable commandlets. However, dataflow goes beyond a sequence of operations, allowing for flows to branch and join as required. Dataflow is also typically asynchronous, allowing for data to arrive and begin processing at arbitrary times — in this it’s more like the Reactive Extensions than LINQ.

In .NET 4.5, you can do dataflow programming using the new System.Threading.Tasks.Dataflow DLL. Here’s a quick look.

Dataflow ‘Hello, world’

If there’s one thing everybody hates more than a Fibonacci program, it’s a ‘Hello, world’ program, right? Well, tough. Here it comes.

// Set up the dataflow
var helloifier = new TransformBlock<string, string>(s => "Hello, " + s);
var printer = new ActionBlock<string>(s => Console.WriteLine(s));
// Send some data into the dataflow
Console.WriteLine("Prepare to be wowed");

This program sets up an almost trivial dataflow, consisting of just two blocks, a transform block and an action block. The transform block receives input, performs a transformation on it, and passes the result to its output. It’s the dataflow equivalent of the LINQ Select operator. In this case the transformation is to stick “Hello” on the front of the input. The action block receives input, does something with it, and doesn’t produce any output. In this case the thing it does is print it to the console. The program links the output of the transform block to the input of the action block.

Next, the program posts some data into the dataflow — specifically into the transform block, since that’s where our flow begins. Post is asynchronous, so it returns immediately, allowing the next statement to run. (You can also use SendAsync, which returns a Task object you can await on to know when the block consumed your input — this can be handy if you’re throwing a lot of data at a block.) Meanwhile the the dataflow kicks off in the background. First the transform block runs, producing the output “Hello, world”; then this output is passed via the link to the action block, which dumps this to the console. So the output is:

Prepare to be wowed
Hello, world

Notice that the Console.WriteLine in the main program ran before the one in the action block.

Dataflow is parallel

Obviously, dataflow isn’t terribly exciting for one message with trivial processing. Things start to hot up though when you have multiple data items or heavier processing needs. In this case dataflow will automatically parallelise the processing. You can control this using the ExecutionDataflowBlockOptions.MaxDegreeOfParallelism setting. In this example, I simulate CPU-bound operations in my transform and action blocks:

static void Busy() {
  for (int i = 0; i < 1000000000; ++i) { }
var bustagut = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
var helloifier = new TransformBlock<string, string>(
  s => { Busy(); return "Hello, " + s; },
var printer = new ActionBlock<string>(
  s => { Busy(); Console.WriteLine(s); },

When I pump a dozen or so messages into this on a quad-core machine, Task Manager shows me getting full value for money out of my CPU!

Dataflow is push-friendly

You’ll have noticed that I initiated the dataflow by calling Post (or SendAsync) to submit a data item into a block. This is different from LINQ’s “pull” model. In LINQ, data processing is initiated when I ask for a result. This means dataflow is well suited to applications where data items are coming in unpredictably — for example, handling items coming in from a network or message queue — or even, heaven forbid, a user interface.

TPL dataflow integrates with Reactive Extensions and other observable APIs via the AsObserver and AsObservable extension methods.

Dataflow is messaging

So far this doesn’t seem to do anything more than LINQ (or, at any rate, Parallel LINQ… or at any rate, some sort of bizarre crossbreed of Parallel LINQ and Reactive Extensions… sorry, I’ll come in again). Let’s build it up into more of a messaging system by adding in a broadcast block. Broadcast takes an input and sends it to potentially multiple outputs. So now we can take each data item and perform multiple operations on it in parallel — something not easily possible in LINQ.

var splitter = new BroadcastBlock<string>(s => s);
var helloifier = new TransformBlock<string, string>(s => "Hello, " + s);
var printer = new ActionBlock<string>(s => Console.WriteLine(s));
var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var emailer = new ActionBlock<string>(s => EmailToBoss(s));

Now each input is both printed to the console and emailed. A simple use case for this might be a logging system where messages are stored locally in a CSV file and also uploaded to a Web service.

And once again, the two branches of the dataflow run in parallel. Each block processes messages independently, so the various transformations and actions can all go at their own pace.

Chunky versus chatty

So far I’ve considered data items in isolation. But when processing involves accessing a remote or expensive resource, processing each data item as it comes in may result in inefficient, chatty communication. It may be more better to batch up a number of data items and submit them together. No problem — just reach for the batch block.

var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var batcher = new BatchBlock<string>(5);
var emailer = new ActionBlock<string[]>(s => EmailToBoss(s));  // Action now takes array of string
// ...

Now the email method will be called only when there are five pending data items waiting to be processed, and all five will be passed to the email method in one go.

One complication with this is that a batch block doesn’t know when the stream of data has dried up and therefore won’t automatically flush an incomplete batch. For this to work we need to do two things: first, we need to tell the dataflow network when we have no more input; and second, we need to propagate this through the network to the batch block. To do the first, call the Complete() method on the input block; to do the second, specify a DataflowLinkOptions with PropagateCompletion = true on each link:

businessifier.LinkTo(batcher, new DataflowLinkOptions { PropagateCompletion = true });
batcher.LinkTo(emailer, new DataflowLinkOptions { PropagateCompletion = true });
for (int i = 0; i < 13; ++i) {
  splitter.SendAsync("world " + i);
splitter.Complete();  // no more data to come

Another approach could be to have a watchdog that periodically calls BatchBlock.TriggerBatch — this would also ensure that batches are delivered in a timely way even if the rate of data items is very low.

Add some dynamism to your dataflow

An interesting feature of dataflow compared to LINQ is that dataflow blocks can be linked and unlinked at run time. The LinkTo method returns an IDisposable which, when disposed, deletes the link. This means you can dynamically update your processing flow at run time depending on resource availability, pricing, varying output requirements, etc. You can even imagine giving users a simple diagramming tool for building their own analysis dataflows using transformation, aggregation and filtering blocks.


The examples of dataflow I’ve shown here are very trivial. But hopefully you can see that it provides a flexible way of processing asynchronous messages, based on messaging principles rather than operating directly on sequences and scaling nicely to multiple cores. TPL dataflow is part of .NET 4.5 so if you want to have a play with it just grab the Visual Studio 11 beta and give it a go!

Tagged as General

What else is new in C# 5?

The big new feature in C# 5 is asynchronous programming support, which I wrote about last week. However, the C# folks have also slipped in a couple of smaller features and I thought I’d round things out by mentioning those.

Method caller information

There’s a complete style guide to be written on Writing Enterprisey Code, but one of my favourite “enterprisey” tells, after the use of Visual Basic, is obsessively logging every function you pass through:

Function AddTwoNumbers(a As Integer, b As Integer) As Integer
  Logger.Trace("ArithmeticHelpers", "AddTwoNumbers", "Entering AddTwoNumbers")
  Dim result = OracleHelpers.ExecInteger("SELECT " & a & " + " & b)
  Logger.Trace("ArithmeticHelpers", "AddTwoNumbers", "Calling PrintPurchaseOrders")
  PrintPurchaseOrders()  ' IFT 12.11.96: don't know why this is needed but shipping module crashes if it is removed
  Logger.Trace("ArithmeticHelpers", "AddTwoNumbers", "Returned from PrintPurchaseOrders")
  Logger.Trace("ArithmeticHelpers", "AddTwoNumbers", "Exiting AddTwoNumbers")
  Return result
End Function

Although this code is efficient and clear by enterprisey standards, with C# 5 it can be even efficienter and clearer. C# 4 introduced optional parameters, which meant callers of a method could leave out the arguments and the compiler would fill in the default values:

public void WonderMethod(int a = 123, string b = "hello") { ... }
WonderMethod(456);  // compiles to WonderMethod(456, "hello")
WonderMethod();     // compiles to WonderMethod(123, "hello")

With C# 5, you can put a special attribute on an optional parameter and the compiler will fill in the value not with a constant but with information about the calling method. This means we can implement the Logger.Trace to automagically pick up where it’s being called from:

public static void Trace(string message, [CallerFilePath] string sourceFile = "", [CallerMemberName] string memberName = "") {
  string msg = String.Format("{0}: {1}.{2}: {3}",
    DateTime.Now.ToString("yyyy-mm-dd HH:MM:ss.fff"),  // Lurking 'minutes'/'months' bug introduced during .NET port in 2003 and has not been noticed because nobody ever looks at the log files because they contain too much useless detail

Now, if the caller calls Log.Trace("some message") the compiler will fill in the missing arguments not with the empty string, but with the file and member where the call happens:

// In file Validation.cs
public void ValidateDatabase() {
  Log.Trace("Entering method");
  // compiles to Log.Trace("Entering method", "Validation.cs", "ValidateDatabase")
  Log.Trace("Exiting method");

Notice that the parameters to which you apply the attributes must be optional. If they aren’t optional, the C# compiler will require the calling code to provide them, and the provided values will override the defaults.

Another example of how you can use this is in implementing INotifyPropertyChanged without needing either literal strings, expression magic or mystic weavers:

public class ViewModelBase : INotifyPropertyChanged {
  protected void Set<T>(ref T field, T value, [CallerMemberName] string propertyName = "") {
    if (!Object.Equals(field, value)) {
      field = value;
  // usual INPC boilerplate
public class Widget : ViewModelBase {
  private int _sprocketSize;
  public int SprocketSize {
    get { return _sprocketSize; }
    set { Set(ref _sprocketSize, value); }  // Compiler fills in "SprocketSize" as propertyName

For what it’s worth, you can also get the line number of the calling code using [CallerLineNumber]. This may be useful for diagnostic methods, but if you really need it, that may be a sign that the calling code is just a bit too enterprisey.

Using loop variables in lambdas

Technically, this is a fix to a long-standing cause of confusion and suffering. But it makes C# that bit more usable, so I’m going to mention it anyway.

Since C# 3, it’s been quicker and easier to write anonymous functions than named ones, thanks to lambda syntax. Anonymous functions are widely used in LINQ, but they’re also used in many other cases where you want to quickly parameterise behaviour without investing in some humungous hierarchy of classes and interfaces and virtual functions. An important feature of anonymous functions is that they can capture variables from their local environment. Here’s an example:

public static IEnumerable<int> GetGreaterThan(IEnumerable<int> source, int n) {
  return source.Where(i => i > n);

Here, i => i > n is an anonymous function that captures the value of n. For example, if n is 17, then the function is i => i > 17.

In previous versions of C#, if you wrote a loop, you couldn’t use the loop variable in a lambda. Actually, it was rather worse than that. You could use the loop variable in a lambda, but it would give you the wrong results — it would use the value of the loop variable at the time the loop was exited, not at the time the variable was captured.

For example, here’s a function which returns a collection of ‘adder’ functions, one ‘adder’ for each addend in the input:

public static List<Func<int, int>> GetAdders(params int[] addends) {
  var funcs = new List<Func<int, int>>();
  foreach (int addend in addends) {
    funcs.Add(i => i + addend);
  return funcs;

Let’s take it for a spin:

var adders = GetAdders(1, 2, 3, 4, 5);
foreach (var adder in adders) {
// Printout: 15 15 15 15 15

Clearly this is horribly wrong! Every function in the returned collection has ended up capturing 5 as its addend. This is because they closed over the loop variable, addend, and the final value of the loop variable was 5.

To make this work in C# 3 and 4, you have to remember to copy the loop variable into a local variable (within the scope of the loop), and have your lambda close over the local variable:

foreach (var addend_ in addends) {
  var addend = addend_;  // DON'T GO NEAR THE LOOP VARIABLE
  funcs.Add(i => i + addend)

Because the functions are closing over a local variable rather than the loop variable, the value is now preserved and you get the correct results.

This isn’t an obscure edge case by the way — I’ve come up against it numerous times in my projects. A more realistic example from one project is building a function to perform filtering. The function is built up from a collection of Restriction objects specified by the user. The code loops over the Restriction objects and builds up a list of functions representing the clauses (e.g. Name Equals “BOB” becomes r => r["Name"] == "BOB"), then combines these functions into a final filter function which runs all of the clauses and checks they are all true. My first pass at this didn’t work because each clause function ended up closing over the same Restriction object — the last one in the collection.

In C# 5, this is fixed and you can close over loop variables and get the results you expect. If you like to take advantage of C#’s hybrid OO-functional nature, this removes a nasty bear trap that has been causing problems for years.

So that’s it for C# 5. From a language point of view, there’s not a whole lot of new stuff to learn, though the async and await keywords conceal a great deal of depth. Happy coding!

Tagged as General

Asynchronous programming in C# 5

One of the more radical design decisions in the forthcoming Windows Runtime is that no API may, even potentially, take more than 50 milliseconds to complete. Operations that could take longer than that will instead have a ‘kick off this operation’ API that returns immediately without waiting for the result of the operation. The reason for this is that Microsoft want Windows 8 Metro applications to be ‘fast and fluid’ — with the immediacy of touch-based UIs, even small hiccups in responsiveness are more obvious and jarring than with a mouse or keyboard. From a UI point of view, therefore, this is a very helpful design policy.

From a developer point of view, though, it makes life a bit tricky. When we call read from a file or make a WCF service call, it’s usually because we want to do something with the result. When the result is guaranteed to be available when the file read or WCF service API returns, we can write our code in a top-to-bottom form which is easy to understand and easy to reason about.

string url = ReadUrlFromFile(filename);
string contentOfUrl = HttpGetFromUrl(url);

APIs like this are called synchronous or blocking. Synchronous APIs are easy to use and understand, but your entire program (well, the current thread) is unresponsive while you’re inside one. The API can’t return control to your code to do other tasks, because it can’t deliver the results yet.

The style of having a ‘kick off’ API which returns immediately is called asynchronous or non-blocking. Programming with asynchronous APIs is more difficult, because you can’t just return the results into a variable and keep going:

string url = BeginReadUrlFromFile(filename);  // Won't work -- file read hasn't completed when BeginRead returns
string contentOfUrl = BeginHttpGetFromUrl(url);  // Ditto

Instead, you have to put the code that uses the results into a callback, which the slow operation will invoke when it’s good and ready:

BeginReadUrlFromFile(filename, url => {
    BeginHttpGetFromUrl(url, contentOfUrl => {

Even a simplified example like this looks pretty ugly. In real asynchronous code, with more operations being composed, more complex callbacks, conditional logic, early exits and error handling, well, it gets pretty ugly. And the asynchronous APIs in the real .NET Framework are uglier still, with IAsyncResult objects and paired EndXxx method calls cluttering up the shop.

And yet this is the way our users would like us to work, and the way we will have to work if we want to target the Windows Runtime.

The old solution: use F#

The brainy folk behind F# figured out a way to get the best of both worlds. F# includes a feature called asynchronous workflows, which are blocks of code introduced by async. In an asynchronous workflow, you can call asynchronous methods using a syntax that looks just like synchronous code:

async {
  let! url = BeginReadUrlFromFile filename
  let! contentOfUrl = BeginHttpGetFromUrl url

The F# compiler automatically converts this nice readable, understandable code into the ghastly callback-style equivalent, thus giving you the ease of use of top-to-bottom coding with the responsive behaviour of asynchronous calls.

The new solution: use C# 5

Now, the equally brainy folk behind C# have implemented the same feature in C#. The next version of C#, which is included in Visual Studio 11 beta, introduces two new keywords, async and await.

The async keyword simply indicates that a method makes asynchronous calls. This is important for callers to know, because it means the method may return before it finishes — the method can yield back to its caller at any asynchronous call.

The await keyword indicates an asynchronous call where we want to keep writing top-to-bottom logic instead of writing out the callbacks by hand. Here’s how they fit together:

public async void ShowReferencedContent(string filename) {
  string url = await BeginReadFromFile(filename);
  string contentOfUrl = await BeginHttpGetFromUrl(url);

This is much easier to write, read and sanity-check than the callback version, but it’s doing the same thing. (Actually, it’s quite a bit smarter than the callback version, because compilers don’t get bored and skip over error conditions or screw up early exit logic or ignore threading issues.)

What happens when we call this method? The first thing that happens is that BeginReadFromFile gets called, with the provided filename and the compiler-generated callback. BeginReadFromFile returns immediately, but the result isn’t available yet. So rather than assigning the result to the url variable — which is actually part of the callback — the method then exits and returns to the caller! The calling method resumes and keeps running its code, even though the called method hasn’t yet finished.

Then, at some later point, the file system completes the read operation. This means the result is now available, and the runtime schedules the callback. This doesn’t necessarily happen immediately — the exact timing depends on the synchronisation context. The callback runs, binds the url variable to the result of the file operation, and calls BeginHttpGetFromUrl. This also returns immediately, meaning the method exits again.

Finally, the HTTP operation completes and the second callback runs. This binds the contentOfUrl variable and, as in all bad demos, displays a message box with the result.

What if I want to return a value to the caller?

Async methods can exit before they’ve finished. So if an async method wants to return a result, it has to recognise that it might return to the caller before that result is available. For this reason, an async method that returns a value has to have a return type of Task rather than a ‘proper’ value. A Task represents a chunk of work which will eventually deliver a value, so a caller can examine the returned Task to determine when the result becomes available. Here’s how an async method looks when returning a value:

public static async Task<string> GetReferencedContent(string filename)
  string url = await BeginReadFromFile(filename);
  string contentOfUrl = await BeginHttpGetFromUrl(url);
  return contentOfUrl;

Notice that the return statement takes a string, even though the return type is Task<string>. Again, the compiler takes care of transforming the return statement to produce a Task.

Now a caller can call the GetReferencedContent method and either await on it to the string when it becomes available, or wait on it manually, or poll it for completion — whatever suits the way it intends to use the result.

Async-friendly APIs

If you’re familiar with asynchronous programming in .NET 4 and earlier, you’ll be used to paired Begin and End methods, such as WebRequest.BeginGetResponse and WebRequest.EndGetResponse. These still exist in .NET 4.5, but they don’t work with the await keyword. (Basically because a BeginXxx method requires an explicit method call inside the callback to get the result, and the compiler couldn’t depend on the EndXxx naming convention.) Instead, .NET 4.5 provides new methods which return Task objects. So instead of calling WebRequest.BeginGetResponse, you’ll call WebRequest.GetResponseAsync. Here’s an example where we finally use some real .NET 4.5 async APIs:

private static async Task<string> GetContent(string url)
  WebRequest wr = WebRequest.Create(url);
  var response = await wr.GetResponseAsync();
  using (var stm = response.GetResponseStream())
    using (var reader = new StreamReader(stm))
      var content = await reader.ReadToEndAsync();
      return content;

Note how similar this looks to the synchronous code using WebRequest.GetResponse() and TextReader.ReadToEnd(). Just stick Async on the end of the API name and stuff an await in front of the call, and you’re good to go!

Tagged as General

Final reminder: 2 days left on our January offer!

Just a quick note: If you’ve seen our big January 2012 offer where you save hundreds of dollars and get one of the best .NET suites around.

5 reasons to take a look

1. Get 9 high quality .NET products.
2. Save at least $300 USD.
3. Get 12 months of new releases.
4. Get 12 months of nightly builds.
5. Get any new product Mindscape releases in the next 12 months.

Taking all that into account and considering that our Mindscape Mega Pack is already very competitively priced, you’d be crazy not to take up this offer before time runs out! The offer extends to team licenses and site licenses too.

This offer strictly ends on the 1st of February.

Tagged as General

Caliburn Micro Part 2: Data Binding and Events

In my previous blog post I showed you how to get started with using the Caliburn Micro Framework in a WPF application. Caliburn Micro helps us implement the application using the MVVM design pattern to get a clean separation between the view and the data model. In this blog post we’ll take a look at how Caliburn Micro assists us with data binding and events. We will be building on top of the application outlined in the previous blog post to add some simple user interaction and data display.

Data binding

We’ll start by getting the application to display a numerical value that is stored in the model. In the AppViewModel class created in the previous blog post, add a property called Count as seen in the code snippet below. The property value is stored in the _count field which we have given a default value of 50. You may recall from last time that we made the AppViewModel class extend the PropertyChangedBase provided by Caliburn Micro to preform property change notifications. Rather than implementing INotifyPropertyChanged in all of your models, you can simply call the NotifyOfPropertyChange method within the setter of your properties.

public class AppViewModel : PropertyChangedBase
  private int _count = 50;
  public int Count
    get { return _count; }
      _count = value;
      NotifyOfPropertyChange(() => Count);

Next we modify the view to display this property value. In AppView.xaml, add a TextBox to the grid as follows:

<Grid MinWidth="300" MinHeight="300" Background="LightBlue">
  <TextBlock Name="Count" Margin="20" FontSize="150" VerticalAlignment="Center" HorizontalAlignment="Center" />

Now run up the application and see that the TextBlock is displaying the default value of the Count property.

A Caliburn Micro app displaying some data

Wait… what? I don’t remember setting any binding on the TextBlock to get the Count property, and yet the TextBlock is displaying the correct data!

Notice that I’ve set the name of the TextBlock to be the same as the property we want to bind to. This is a convenient short cut that Caliburn Micro provides. For elements that display data such as TextBlock or TextBox, setting their name to match a property on the data model will automatically hook it up for you.

Handling events

Next, let’s work on adding a button that increments the displayed value. The quick and dirty way of doing this is to hook up the Click event of a button to an event handler in the code behind. However, when using the MVVM pattern it’s usually best (but not absolutely necessary) to avoid using the code behind where you can. So let’s look at how to handle the events Caliburn Micro style. First, add a method to the AppViewModel for incrementing the Count property like this:

public void IncrementCount()

Now add a button to the grid in AppView.xaml as follows:

<Grid MinWidth="300" MinHeight="300" Background="LightBlue">
  <RepeatButton Name="IncrementCount" Content="Up" Margin="15" VerticalAlignment="Top" />
  <TextBlock Name="Count" FontSize="150" VerticalAlignment="Center" HorizontalAlignment="Center" />

Running the application now and clicking the button will increment the count value as advertised. Once again you’ll notice that we don’t need to do much work to hook the click event of the button to the IncrementCount method. For certain user interface controls such as buttons, you can simply set the name of the control to be the name of the method you want it to be hooked to. Caliburn Micro will hook the appropriate event of the user control to the specified method in the model. In the case of buttons, Caliburn Micro deals with the Click event. (You can also manually specifying which event to hook up to which method which we’ll look at next time).

Clicking the button increments the value

Event guards

When Caliburn Micro automatically hooks up the Click event to the IncrementCount method, it also looks for a method or property called CanIncrementCount. By adding a CanIncrementCount method or property, you can include additional logic that determines whether the event is allowed to be handled based on the current state of the model. Let’s do this now by adding the following property to the AppViewModel:

public bool CanIncrementCount
  get { return Count < 100; }

Since this logic is based on the value of the Count property, we also need to raise property change notification for the CanIncrementCount property whenever the Count value changes. This is done by adding this line of code on the Count property setter:

NotifyOfPropertyChange(() => CanIncrementCount);

Run up the application once more and increment the value to 100. Once the limit has been reached, the button will become disabled and prevent the user from further incrementing the value.

Button is disabled when count is 100

In this tutorial we have looked at a few ways that Caliburn Micro takes some of the work off our shoulders. Now you can more quickly use data binding, property change notifications, event handlers and event guards, while at the same time implementing a sturdy MVVM application that is easier to test and maintain.

You can download the full Visual Studio 2010 solution for this tutorial here. Use it to practice your new skills by adding a button to decrement the value.

In the next instalment of this blog series, we’ll take a look at more advanced scenarios of event handling with Caliburn Micro, specifically with passing event parameters. See you then.

Tagged as General, WPF


Join our mailer

You should join our newsletter! Sent monthly:

Back to Top