pipeline pattern use parallel tasks & concurrent queues to process a sequence
of input values
each task implements a stage of the pipeline
queues act as buffers
outputs occur in the same order as inputs
composed of a series of producer/consumer stages
use when there are too many dependencies for a parallel loop
The Basics
buffers are usually based on theBlockingCollection<T> type
pipeline with 4 stages
input
// stage 1
read strings
buffer 1
// stage 2
correct casing
buffer 2
// stage 3
create sentences
buffer 3
// stage 4
write sentences
output
each stages reads from a dedicated input and writes to a particular output
all stages can execute at the same time because concurrent queues buffer shared
inputs and outputs
a stage can add its value to its output buffer as long as there is room
if there is no room the stage blocks until buffer space become available
blocking can also occur on input
BlockingCollection<T> method CompleteAdding signals 'EOF'
the consumer can the shut down once the queue is empty
implementation of pipeline
int seed = ...
int bufferSize = ...
var buffer1 = new BlockingCollection<string>(bufferSize);
var buffer2 = new BlockingCollection<string>(bufferSize);
var buffer3 = new BlockingCollection<string>(bufferSize);
var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContunationOptions.None);
var stage1 = f.StartNew(() => ReadStrings(buffer1, ...));
var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2));
var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3));
var stage4 = f.StartNew(() => WriteSentences(buffer3));
Task.WaitAll(stage1, stage2, stage3, stage4);
first stage of pipeline includes sequential loop that writes to the output buffer,br/>
loop populates output buffer with values
values accessed by PhraseSource method which returns ordinary single-threaded instance
of IEnumerable<string>
call to CompleteAdding method is done in finally block
static void ReadStrings(BlockingCollection<string> output, int seed)
{
try
{
foreach(var phrase in PhraseSource(seed))
{
stage1AdditionalWork();
output.Add(phrase);
}
}
finally
{
output.CompleteAdding();
}
}
implementation of stages 2 & 3 are structured similarly to this code
void DoStage(BlockingCollection<string> input, BlockingCollection<string> output)
{
try
{
foreach(var item in input.GetConsumingEnumerable())
{
var result = ...
output.Add(result);
}
}
finally
{
output.CompleteAdding();
}
}
last stage writes values to a stream
static void WriteSentences(BlockingCollection<string> input)
{
using StreamWriter outfile = new StreamWriter(...)
{
foreach (var sentence in input.GetConsumingEnumerable())
{
var printSentence = ...
outfile.WriteLine(printSentence);
}
}
}
Performance Characteristics
when stages of a pipeline don't take the same amount of time, the speed of the pipeline
is approximately the same as the speed of its slowest stage
the most efficient pipelines have stages of near equal speed
Canceling a Pipeline
observe cancellation token in two places
void DoStage(BlockingCollection<string> input, BlockingCollection<string> output, CancellationToken token)
{
try
{
foreach(var item in input.GetConsumingEnumerable())
{
if(token.IsCancellationRequested)
break;
var result = ...
output.Add(result, token);
}
}
catch(OperationCanceledException)
{
}
finally
{
output.CompleteAdding();
}
}
natural place to check for cancellation is at beginning of loop
pass the token to the output BlockingCollection<T> to avoid possible deadlock
if type T implements IDispose, must call Dispose method on cancellation in stages
int count = 0;
int clockOffset = Environment.TickCount;
var token = cts.Token;
ImageInfo info = null;
try
{
foreach (var fileName in fileNames)
{
if (token.IsCancellationRequested)
break;
info = LoadImage(fileName, sourceDir, count, clockOffset);
original.Add(info, token);
count += 1;
info = null;
}
}
catch (Exception e)
{
// in case of exception, signal shutdown to other pipeline tasks
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
finally
{
original.CompleteAdding();
if (info != null) info.Dispose();
}
need to cleanup the queues on error or cancellation
static void RunPipelined(IEnumerable<string> fileNames, string sourceDir, int queueLength, Action<ImageInfo> displayFn, CancellationTokenSource cts)
{
// Data pipes
var originalImages = new BlockingCollection<ImageInfo>(queueLength);
var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
try
{
var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
Action<ImageInfo> updateStatisticsFn = info =>
{
info.QueueCount1 = originalImages.Count();
info.QueueCount2 = thumbnailImages.Count();
info.QueueCount3 = filteredImages.Count();
};
// Start pipelined tasks
var loadTask = f.StartNew(() => LoadPipelinedImages(fileNames, sourceDir, originalImages, cts));
var scaleTask = f.StartNew(() => ScalePipelinedImages(originalImages, thumbnailImages, cts));
var filterTask = f.StartNew(() => FilterPipelinedImages(thumbnailImages, filteredImages, cts));
var displayTask = f.StartNew(() => DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), displayFn, updateStatisticsFn, cts));
Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
}
finally
{
// in case of exception or cancellation, there might be bitmaps
// that need to be disposed.
DisposeImagesInQueue(originalImages);
DisposeImagesInQueue(thumbnailImages);
DisposeImagesInQueue(filteredImages);
}
}
static void DisposeImagesInQueue(BlockingCollection<ImageInfo> queue)
{
if (queue != null)
{
queue.CompleteAdding();
foreach (var info in queue)
{
info.Dispose();
}
}
}
Handling Pipeline Exceptions
exceptions are similar to cancellations
difference is no default notification
when an exception occurs in a stage use special instantiation of CancellationTokenSource
type to coordinate pipeline shutdown
static void DoPipeline(CancellationToken token)
{
using (CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(token))
{
var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskCreationOptions.None);
var stage1 = f.StartNew(() => DoStage1(..., cts));
var stage2 = f.StartNew(() => DoStage2(..., cts));
var stage3 = f.StartNew(() => DoStage3(..., cts));
var stage4 = f.StartNew(() => DoStage4(..., cts));
Task.WaitAll(stage1, stage2, stage3, stage4);
}
}
CancellationTokenSource.CreateLinkedTokenSource method creates a handle that allow
response to an external cancellation request and also initiate & respond to
an internal cancellation request
pass linked cancellation token to stages
static void DoStage(... , CancellationTokenSource cts)
{
var token = cts.Token;
try
{
foreach(var item in input.GetConsumingEnumerable())
{
if(token.IsCancellationRequested)
break;
var result = ...
output.Add(result, token);
}
}
catch (Exception e)
{
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
finally
{
...
}
}
after all pipeline tasks stop an AggregateException is thrown by the Task.WaitAll
method
Load Balancing using Multiple Producers
BlockingCollection<T> allows more than one producer to read values
use static TakeFromAny method to implement load-balancing strategies for some scenarios
Pipelines & Streams
blocking collections & streams have some similarities
can use pipeline pattern with library methods that read & write to streams
can create a stream whose underlying implementation relies on tasks and blocking
collections
Asynchronous Pipelines
in an asynchronous pipeline tasks aren't created until data becomes available
AsyncCall type is a queue that a producer puts data into
if no task is processing a new task is created
when task empties its queue the task exits
Anti-Patterns
Thread Starvation
pipeline requires all of its task to execute concurrently
if not enough threads blocking collections can fill up & block indefinitely
to guarantee a thread for each task use the LongRunning task creation option
Infinite Blocking Collection Waits
avoid by calling CancellationTokenSource.Cancel when an exception occurs in a pipeline
stage
Forgetting GetConsumableEnumerable()
use GetConsumingEnumerable() method to iterate blocking collection
iterating the queue using IEnumerable uses a snapshot other queue rather than the
queue itself
Using Other Producer/Consumer Collections
can use custom storage mechanism in place of BlockingCollection<T>
must implement IProducerConsumerCollection
other implementations include ConcurrentBag (unordered) & ConcurrentStack (LIFO)
types
generally FIFO ordering is used in pipelines
Design Notes
many patterns include the pipeline pattern do not automatically scale with the number of cores
this is limitation unless additional parallelism is introduced within a pipeline stage
for high parallelism stages' execution times should all be similar
pipeline is gated by its slowest stage
buffer size is important
too small & the pipeline may be blocked
buffers should be large enough to absorb variability in pipeline flow but no larger