Introducing the .NET Framework 4.0 Task Parallel Library BlockingCollection

Introduction to the BlockingCollection

In my article on Task Parallel library Task class, I explained the role of the Task Classand how a class called BlockingCollection mediates between executing tasks. However I never elaborated on all the ways it can be used. This articles tackles some if these ways. I’ll explain how to use the BlockingCollection class and provide a stepping stone to applying it in a Parallel Computing solution.

To gain a fundamental understanding of a data structure often it helps to describe the problem it solves or the need it fulfills. I’ve always thought of the stages in an algorithm as a type of assembly line. So I chose a manufacturing analogy to frame the problem a BlockingCollection solves.

An Assembly Line Analogy

Like all data structures in the .NET Framework, the BlockingCollection has a special purpose. Using a clothing manufacturing assembly line analogy, if Tasks are the assembly line “workers”, then the BlockingCollection is the protocol workers use to pass materials between stages in the assembly line.

In a clothing manufacturing assembly line each stage is responsible for some portion of the assembled product. No two stages can be working on, for example, the same shirt at the same time. The sewing on buttons stage cannot be performed during the cutting the pattern stage.

As the shirt is passed between stages there is a handoff protocol. One stage relinquishes ownership of the shirt and another stage takes ownership. There may be many variations in the handoff protocol. For example, is the shirt passed to the right or the left? Where are shirts “stacked” when one stage completes, but the next stage isn’t ready to receive the shirt? How are the shirts stacked, latest on top or bottom?

In the .NET Parallel Computing world, tasks, rather than workers, are assembled to process data rather than shirts. Similar problems arise between stages. Faster stages need a place to “stack” processed data to be consumed in the next stage.

This behavior pattern is often called pipelining or producer/consumer in the Parallel computing resources at the end of this article. I’m going to show how to assemble a pipeline of tasks with the BlockingCollection using some sample projects.

Sample Overview

BlockingCollection controls the producer/consumer handoff by applying three fundamental ideas:

Handoff’s can be simple. A stage simply waits until data from a prior stage appears. Once the prior stage in a handoff makes data available, the succeeding stage consumes the data and passes data onto the next stage. This behavior is captured in the GetConsumingEnumerable, add, and take methods on the BlockingCollection. This idea is illustrated in the Sample.WithBlocking project.

Stages consume resources and may need to constrain resource usage by controlling the capacity in a BlockingCollection. When any stage exceeds some resource limit, all stages must be halted before the next stage can continue. Going back to the shirt manufacturing analogy; at any particular stage in the manufacturing process, the shirt pile size has finite limits. Some method may be required to halt the whole assembly line if any stage exceeds the pile size limit. Prior stages need to pause, but they also need to be signaled when the pile size shrinks. This idea is illustrated in the Sample.WithBounding project.

Stages can be dynamic – When a particular stage is waiting for a prior stage to complete, it may perform some other action and then return back to waiting. Also, waiting too long may signal trouble in the whole system. Some method for determining when a timeout has occurred is often needed to coordinate stages. This idea is captured in the Sample.WithoutBlocking project.

There are some common features in each sample I need to review before delving into the details.

Common Sample Features

As you will notice later in the article all the samples follow the same pattern with variations based on the BlockingCollection methods being employed. The basic pattern is captured in Figure 1 below.

 Program Flow
Figure 1: Program Flow

Integers are pulled from an array in the first stage and passed between stages. Pausing simulates work inside the Task. Output includes time intervals so a developer can observe when something executes in relation to something else. Stages that have completed call CompleteAdding. The Task.WaitAll method blocks until all stages have executed.

Each stage executes within its own Task class.

Turning now to the sample project, I’m starting with Sample.WithBlocking.

  Sample.WithBlocking
  The sample code from the WithBlocking project appears below.
  int[] items = { 1, 2, 3, 4, 5, 6, 7, 8 };
  var startTime = DateTime.Now;

  Console.WriteLine("Starting...");

  var stage1 = new BlockingCollection<int>();
  var stage2 = new BlockingCollection<int>();

  //Pulls numbers from the array
  var task0 = Task.Factory.StartNew(() =>
  {
      foreach (int i in items)
      {
          stage1.Add(i);
          Console.WriteLine("Add:{0} Count={1}", i, stage1.Count);
      }

      Console.WriteLine("Loaded all items into stage 1...");
      stage1.CompleteAdding();

  });


  //Reads and passes data onto next stage
  var task1 = Task.Factory.StartNew(() =>
  {
                  
      foreach(var i in stage1.GetConsumingEnumerable())
      {
          stage2.Add(i);

          Console.WriteLine("Stage 1 Process: {0} elapsed {1}", i,DateTime.Now.Subtract(startTime).TotalSeconds);

          //Pause 2 seconds simulating work
          Thread.Sleep(new TimeSpan(0, 0,1));
      }
      Console.WriteLine("Emptied all items from Stage 1...");
      stage2.CompleteAdding();
  });


  //Reads prints data
  var task2 = Task.Factory.StartNew(() =>
  {
      int i = -1;
      while (!stage2.IsCompleted)
      {
          try
          {
              i = stage2.Take();
          }
          catch (InvalidOperationException) //Take throws exception when completed
          {
              Console.WriteLine("Emptied all items from Stage 2...");
              break;
          }
          Console.WriteLine("Stage 2 Process: {0} elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds);

          //Pause a little over half second to simulate work
          Thread.Sleep(new TimeSpan(0, 0, 0,0,600));
      }

  });


  Task.WaitAll(task0, task1, task2);
  Console.WriteLine("Completed.  Press any key to quit...");
  Console.ReadKey();

WithBlocking demonstrates two ways to consume a BlockingCollection.

Task1, utilizes GetConsumingEnumerables and a foreach loop. Employing GetConsumingEnumerables is pretty common in all the samples you’ll see among the resources at the end of this article.

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read