Table of Contents
1 TPL Dataflow Basics
1.1 Blocks
In dataflow, blocks (or nodes) are entities that may send and receive data and are the basic unit of composition. The TPL Dataflow Library comes with a handful of predefined blocks, while they’re very basic, they should cover 99% of your needs. Using these predefined blocks, you can build your own application specific blocks.
Think of the predefined blocks as being equivalent to keywords in C#. You build C# applications by using the keywords. The predefined blocks similarly define the basic operations of dataflow programs that you use to build your dataflow application.
Of the predefined blocks offered by the TPL Dataflow library, we can categorize them into three groups, blocks that process data (execution blocks), blocks that buffer or store data (buffer blocks) and blocks that group data together into collections (grouping blocks). In the follow sections we’ll examine each category of block and discover how they work with simple code examples.
1.1.1 Execution Blocks
Execution blocks process data very similar to how methods accept data and possibly returns a value. At creation you pass either a Func
or an Action
that defines what the execution block will do with the data.
All execution blocks contain an internal buffer that defaults to an unbounded capacity.
1.1.1.1 ActionBlock<T>
An ActionBlock<T>
has a single input and no output. It is used when you need to do something with the input data but won’t need to pass it along to other blocks. It is the equivalent to the Action<T>
class. In dataflow, this type of block is often called a “sink” because the data sinks into it like a black hole, never to emerge again.
At creation, an ActionBlock<T>
accepts an Action<T>
that is called when data arrives at the input.
An internal buffer is present on the input of an ActionBlock<T>
. The buffer defaults to an unbounded capacity but this can be changed by using DataflowBlockOptions
mentioned later.
ActionBlock<T> Example 1
Basic usage, Block threading, Post()
1
using
System
;
2
using
System.Threading.Tasks.Dataflow
;
3
4
namespace
TPLDataflowByExample
5
{
6
static
class
ActionBlockExample1
7
{
8
static
public
void
Run
()
{
9
10
var
actionBlock
=
new
ActionBlock
<
int
>(
n
=>
Console
.
WriteLine
(
n
));
11
12
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
13
actionBlock
.
Post
(
i
);
14
}
15
16
Console
.
WriteLine
(
"Done"
);
17
}
18
}
19
}
This example shows the basic usage of an ActionBlock<T>
and how to send data to all types of blocks that accept inputs.
The Post()
function sends data synchronously to blocks and returns true
if the data was successfully accepted. If the block refuses the data, the function returns false
and it will not attempt to resend it.
In this example the numbers 0 through 9 are pushed to actionBlock
. The block takes each value and calls the Action<T>
that was given at creation. Since, in this example, our action simply prints the received data, the output to the console looks like:
Notice how “Done” was printed first. This is because actionBlock
was executed in parallel to the main thread.
ActionBlock<T> Example 2
Basic usage with a delay
1
using
System
;
2
using
System.Threading
;
3
using
System.Threading.Tasks.Dataflow
;
4
5
namespace
TPLDataflowByExample
6
{
7
class
ActionBlockExample2
8
{
9
static
public
void
Run
()
{
10
11
var
actionBlock
=
new
ActionBlock
<
int
>(
n
=>
{
12
Thread
.
Sleep
(
1000
);
13
Console
.
WriteLine
(
n
);
14
});
15
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
16
actionBlock
.
Post
(
i
);
17
}
18
19
Console
.
WriteLine
(
"Done"
);
20
}
21
}
22
}
This example is almost identical to example 1 except we are now sleeping for one second before printing the value to the console to simulate a long running action in the block. The output is the same as example 1.
1.1.1.2 TransformBlock<T1,T2>
A TransformBlock<T1,T2>
is very similar to an ActionBlock<T>
except it also has an output that you can connect to other blocks (linking blocks will be covered in a later section). It is equivalent to a Func<T1,T2>
in that it returns a result. Similar to an ActionBlock<T>
, it takes a function at creation that operates on the input data.
This block contains two buffers, one on the input and one on the output but it is best to think of it as only having a single buffer. Two buffers are needed to ensure that the data is transmitted in the same order as it arrived. The output buffer is used to restore the original ordering of the data. But this is an implementation detail that you should know about but not need to worry about normally.
TransformBlock<T1,T2> Example 1
Receive()
1
using
System
;
2
using
System.Threading
;
3
using
System.Threading.Tasks.Dataflow
;
4
5
namespace
TPLDataflowByExample
6
{
7
class
TransformBlockExample1
8
{
9
static
public
void
Run
()
{
10
Func
<
int
,
int
>
fn
=
n
=>
{
11
Thread
.
Sleep
(
1000
);
12
return
n
*
n
;
13
};
14
15
var
tfBlock
=
new
TransformBlock
<
int
,
int
>(
fn
);
16
17
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
18
tfBlock
.
Post
(
i
);
19
}
20
21
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
22
int
result
=
tfBlock
.
Receive
();
23
Console
.
WriteLine
(
result
);
24
}
25
26
Console
.
WriteLine
(
"Done"
);
27
}
28
}
29
}
In this example we create a TransformBlock<T1,T2>
with a function that squares the input value after a one second wait to simulate a long running process.
To extract data from a TransformBlock<T1,T2>
(or any block with an output) you use the Receive()
method that operates synchronously. If no data is available, the thread will be suspended until data is available. We highlight that fact in this example.
Executing this code should display…
The for
loop passes the numbers 0 through 9 to the tfBlock
. The function that we passed in at creation time then squares each value and sends the result to the output where we Receive()
them.
Notice that for this example “Done” is only printed after all the output values have been printed. This is because the Receive()
method operates synchronously in the same thread as the for
loops.
TransformBlock<T1,T2> Example 2
ReceiveAsync(), Task.Result()
1
using
System
;
2
using
System.Threading
;
3
using
System.Threading.Tasks
;
4
using
System.Threading.Tasks.Dataflow
;
5
6
namespace
TPLDataflowByExample
7
{
8
class
TransformBlockExample2
9
{
10
static
public
void
Run
()
{
11
Func
<
int
,
int
>
fn
=
n
=>
{
12
Thread
.
Sleep
(
1000
);
13
return
n
*
n
;
14
};
15
16
var
tfBlock
=
new
TransformBlock
<
int
,
int
>(
fn
);
17
18
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
19
tfBlock
.
Post
(
i
);
20
}
21
22
// RecieveAsynch returns a Task
23
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
24
Task
<
int
>
resultTask
=
tfBlock
.
ReceiveAsync
();
25
int
result
=
resultTask
.
Result
;
26
// Calling Result will wait until it has a value ready
27
Console
.
WriteLine
(
result
);
28
}
29
30
Console
.
WriteLine
(
"Done"
);
31
}
32
}
33
}
This example shows how to receive data, asynchronously, from all blocks with outputs using the aptly named ReceiveAsync()
method. Since it operates asynchronously, the method does not return a value like the Receive()
method does. Instead the ReceiveAsync()
method returns a Task<T>
that represents the receive operation. Calling the Result()
method on the returned Task
forces the program to wait until data becomes available essentially making it a synchronous operation like the previous example with the same console output. The next example shows how to create a completely asynchronous receive.
TransformBlock<T1,T2> Example 3
ReceiveAsync(), Task.ContinueWith()
1
using
System
;
2
using
System.Threading
;
3
using
System.Threading.Tasks
;
4
using
System.Threading.Tasks.Dataflow
;
5
6
namespace
TPLDataflowByExample
7
{
8
class
TransformBlockExample3
9
{
10
static
public
void
Run
()
{
11
Func
<
int
,
int
>
fn
=
n
=>
{
12
Thread
.
Sleep
(
1000
);
13
return
n
*
n
;
14
};
15
16
var
tfBlock
=
new
TransformBlock
<
int
,
int
>(
fn
);
17
18
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
19
tfBlock
.
Post
(
i
);
20
}
21
22
Action
<
Task
<
int
>>
whenReady
=
task
=>
{
23
int
n
=
task
.
Result
;
24
Console
.
WriteLine
(
n
);
25
};
26
27
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
28
Task
<
int
>
resultTask
=
tfBlock
.
ReceiveAsync
();
29
resultTask
.
ContinueWith
(
whenReady
);
30
// When 'resultTask' is done,
31
// call 'whenReady' with the Task
32
}
33
34
Console
.
WriteLine
(
"Done"
);
35
}
36
}
37
}
If we modify the previous example slightly, we can receive data from blocks asynchronously. The addition of a continuation with the ContinueWith()
method allows our main thread to proceed without having to wait for data to be available to read.
A continuation is just something that will be done after the Task
is completed. In this case our continuation is the whenReady
action that will print the result to the console.
When run, the example displays…
We again have “Done” printed first since the main thread doesn’t have to wait to receive data.
1.1.1.3 Block Configuration
All of the pre-defined blocks in the TPL Dataflow library can be configured by passing an options object to the blocks’ constructor. Execution blocks use the ExecutionDataflowBlockOptions
class, grouping blocks use the GroupingDataflowBlockOptions
class and buffering blocks use the DataflowBlockOptions
class. ExecutionDataflowBlockOptions
and GroupingDataflowBlockOptions
both inherit from the DataflowBlockOptions
class (described in the Grouping Blocks section).
1.1.1.4 Execution Block Options
In addition to the options provided by its base class, ExecutionDataflowBlockOptions
also includes the options, MaxDegreeOfParallelism
and SingleProducerConstrained
.
ExecutionDataflowBlockOptions Example 1
MaxDegreeOfParallelism
1
using
System
;
2
using
System.Threading
;
3
using
System.Threading.Tasks.Dataflow
;
4
5
namespace
TPLDataflowByExample
6
{
7
class
ExecutionDataflowBlockOptionsExample1
8
{
9
static
public
void
Run
()
{
10
11
var
generator
=
new
Random
();
12
Action
<
int
>
fn
=
n
=>
{
13
Thread
.
Sleep
(
generator
.
Next
(
1000
));
14
Console
.
WriteLine
(
n
);
15
};
16
var
opts
=
new
ExecutionDataflowBlockOptions
{
17
MaxDegreeOfParallelism
=
2
18
};
19
20
var
actionBlock
=
new
ActionBlock
<
int
>(
fn
,
opts
);
21
22
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
23
actionBlock
.
Post
(
i
);
24
}
25
26
Console
.
WriteLine
(
"Done"
);
27
}
28
}
29
}
Blocks can be configured to operate on more than one piece of data at a time. The default is for each value to be processed one at a time. The MaxDegreeOfParallelism
option tells the computer to operate on multiple values at a time in parallel.
This example is a modification of the ActionBlock Example 2. We added a random delay to actionBlock
to more closely approximate a real world situation. Running this example shows how the output order of values differs from the input order due to different delays.
On my machine running the example produces…
ExecutionDataflowBlockOptions Example 2
SingleProducerConstrained
1
using
System
;
2
using
System.Diagnostics
;
3
using
System.Threading
;
4
using
System.Threading.Tasks.Dataflow
;
5
6
namespace
TPLDataflowByExample
7
{
8
// http://blogs.msdn.com/b/pfxteam/archive/2011/09/27/10217461.aspx
9
class
ExecutionDataflowBlockOptionsExample2
10
{
11
static
public
void
Benchmark1
()
{
12
var
sw
=
new
Stopwatch
();
13
const
int
ITERS
=
6000000
;
14
var
are
=
new
AutoResetEvent
(
false
);
15
16
var
ab
=
new
ActionBlock
<
int
>(
i
=>
{
if
(
i
==
ITERS
)
are
.
Set
();
});
17
while
(
true
)
{
18
sw
.
Restart
();
19
for
(
int
i
=
1
;
i
<=
ITERS
;
i
++)
ab
.
Post
(
i
);
20
are
.
WaitOne
();
21
sw
.
Stop
();
22
Console
.
WriteLine
(
"Messages / sec: {0:N0}"
,
23
(
ITERS
/
sw
.
Elapsed
.
TotalSeconds
));
24
}
25
}
26
static
public
void
Benchmark2
()
{
27
var
sw
=
new
Stopwatch
();
28
const
int
ITERS
=
6000000
;
29
var
are
=
new
AutoResetEvent
(
false
);
30
31
var
ab
=
new
ActionBlock
<
int
>(
i
=>
{
if
(
i
==
ITERS
)
are
.
Set
();
},
32
new
ExecutionDataflowBlockOptions
{
33
SingleProducerConstrained
=
true
34
});
35
while
(
true
)
{
36
sw
.
Restart
();
37
for
(
int
i
=
1
;
i
<=
ITERS
;
i
++)
ab
.
Post
(
i
);
38
are
.
WaitOne
();
39
sw
.
Stop
();
40
Console
.
WriteLine
(
"Messages / sec: {0:N0}"
,
41
(
ITERS
/
sw
.
Elapsed
.
TotalSeconds
));
42
}
43
}
44
}
45
}
The option SingleProducerConstrained
is an optimization for situations where there is only a single block feeding data to another block. The creator of the TPL Dataflow library, Stephen Toub, explains its usage:
Dataflow blocks by default are usable by any number of threads concurrently. While flexible, this also places more synchronization requirements, and therefore cost, on the blocks than might otherwise be necessary. If a block is only ever going to be used by a single producer at a time, meaning only one thread at a time will be using methods like Post, OfferMessage, and Complete on the block, this property may be set to true to inform the block that it need not apply extra synchronization. For blocks that observe this property, you can significantly reduce synchronization overheads by setting this property to true. Right now, only ActionBlock pays attention to this property, but more blocks could in the future as necessary.
(from http://blogs.msdn.com/b/pfxteam/archive/2011/09/27/10217461.aspx)
Using his code (above) for a performance comparison, he measured an ActionBlock<T>
throughput of 10,942,715 without the SingleProducerConstrained
option (Benchmark1()
), and 37,456,691 with the option set (Benchmark2()
).