Introducing the .NET Framework 4.0 Task Parallel Library BlockingCollection

Introduction to the BlockingCollection

In my article on Task Parallel library Task class, I explained the role of the Task Classand how a class called BlockingCollection mediates between executing tasks. However I never elaborated on all the ways it can be used. This articles tackles some if these ways. I'll explain how to use the BlockingCollection class and provide a stepping stone to applying it in a Parallel Computing solution.

To gain a fundamental understanding of a data structure often it helps to describe the problem it solves or the need it fulfills. I've always thought of the stages in an algorithm as a type of assembly line. So I chose a manufacturing analogy to frame the problem a BlockingCollection solves.

An Assembly Line Analogy

Like all data structures in the .NET Framework, the BlockingCollection has a special purpose. Using a clothing manufacturing assembly line analogy, if Tasks are the assembly line "workers", then the BlockingCollection is the protocol workers use to pass materials between stages in the assembly line.

In a clothing manufacturing assembly line each stage is responsible for some portion of the assembled product. No two stages can be working on, for example, the same shirt at the same time. The sewing on buttons stage cannot be performed during the cutting the pattern stage.

As the shirt is passed between stages there is a handoff protocol. One stage relinquishes ownership of the shirt and another stage takes ownership. There may be many variations in the handoff protocol. For example, is the shirt passed to the right or the left? Where are shirts "stacked" when one stage completes, but the next stage isn't ready to receive the shirt? How are the shirts stacked, latest on top or bottom?

In the .NET Parallel Computing world, tasks, rather than workers, are assembled to process data rather than shirts. Similar problems arise between stages. Faster stages need a place to "stack" processed data to be consumed in the next stage.

This behavior pattern is often called pipelining or producer/consumer in the Parallel computing resources at the end of this article. I'm going to show how to assemble a pipeline of tasks with the BlockingCollection using some sample projects.

Sample Overview

BlockingCollection controls the producer/consumer handoff by applying three fundamental ideas:

Handoff's can be simple. A stage simply waits until data from a prior stage appears. Once the prior stage in a handoff makes data available, the succeeding stage consumes the data and passes data onto the next stage. This behavior is captured in the GetConsumingEnumerable, add, and take methods on the BlockingCollection. This idea is illustrated in the Sample.WithBlocking project.

Stages consume resources and may need to constrain resource usage by controlling the capacity in a BlockingCollection. When any stage exceeds some resource limit, all stages must be halted before the next stage can continue. Going back to the shirt manufacturing analogy; at any particular stage in the manufacturing process, the shirt pile size has finite limits. Some method may be required to halt the whole assembly line if any stage exceeds the pile size limit. Prior stages need to pause, but they also need to be signaled when the pile size shrinks. This idea is illustrated in the Sample.WithBounding project.

Stages can be dynamic - When a particular stage is waiting for a prior stage to complete, it may perform some other action and then return back to waiting. Also, waiting too long may signal trouble in the whole system. Some method for determining when a timeout has occurred is often needed to coordinate stages. This idea is captured in the Sample.WithoutBlocking project.

There are some common features in each sample I need to review before delving into the details.

Common Sample Features

As you will notice later in the article all the samples follow the same pattern with variations based on the BlockingCollection methods being employed. The basic pattern is captured in Figure 1 below.

 Program Flow
Figure 1: Program Flow

Integers are pulled from an array in the first stage and passed between stages. Pausing simulates work inside the Task. Output includes time intervals so a developer can observe when something executes in relation to something else. Stages that have completed call CompleteAdding. The Task.WaitAll method blocks until all stages have executed.

Each stage executes within its own Task class.

Turning now to the sample project, I'm starting with Sample.WithBlocking.

  Sample.WithBlocking
  The sample code from the WithBlocking project appears below.
  int[] items = { 1, 2, 3, 4, 5, 6, 7, 8 };
  var startTime = DateTime.Now;
  
  Console.WriteLine("Starting...");
  
  var stage1 = new BlockingCollection<int>();
  var stage2 = new BlockingCollection<int>();
  
  //Pulls numbers from the array
  var task0 = Task.Factory.StartNew(() =>
  {
      foreach (int i in items)
      {
          stage1.Add(i);
          Console.WriteLine("Add:{0} Count={1}", i, stage1.Count);
      }
  
      Console.WriteLine("Loaded all items into stage 1...");
      stage1.CompleteAdding();
  
  });
  
  
  //Reads and passes data onto next stage
  var task1 = Task.Factory.StartNew(() =>
  {
                  
      foreach(var i in stage1.GetConsumingEnumerable())
      {
          stage2.Add(i);
  
          Console.WriteLine("Stage 1 Process: {0} elapsed {1}", i,DateTime.Now.Subtract(startTime).TotalSeconds);
  
          //Pause 2 seconds simulating work
          Thread.Sleep(new TimeSpan(0, 0,1));
      }
      Console.WriteLine("Emptied all items from Stage 1...");
      stage2.CompleteAdding();
  });
  
  
  //Reads prints data
  var task2 = Task.Factory.StartNew(() =>
  {
      int i = -1;
      while (!stage2.IsCompleted)
      {
          try
          {
              i = stage2.Take();
          }
          catch (InvalidOperationException) //Take throws exception when completed
          {
              Console.WriteLine("Emptied all items from Stage 2...");
              break;
          }
          Console.WriteLine("Stage 2 Process: {0} elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds);
  
          //Pause a little over half second to simulate work
          Thread.Sleep(new TimeSpan(0, 0, 0,0,600));
      }
  
  });
  
  
  Task.WaitAll(task0, task1, task2);
  Console.WriteLine("Completed.  Press any key to quit...");
  Console.ReadKey();

WithBlocking demonstrates two ways to consume a BlockingCollection.

Task1, utilizes GetConsumingEnumerables and a foreach loop. Employing GetConsumingEnumerables is pretty common in all the samples you'll see among the resources at the end of this article.



Introducing the .NET Framework 4.0 Task Parallel Library BlockingCollection

Take, in Task2, is the second way to consume a BlockingCollection. Like GetConsumingEnumerables, take blocks until data is available in the collection. The try/catch block is required because, according to the documentation, InvalidOperationException is thrown when the collection is marked complete.

Like in all the samples, absent are locking and wait statements a developer normally utilizes when adding or removing data in a concurrency safe way. Like all the concurrency safe collections in the .NET Framework locking and signaling are handled for a developer from within the collection.

All samples utilize the default BlockingCollection constructor, but you'll notice a constructor that takes the IProducterConsumerCollection parameter. BlockingCollection allows a developer to change its internal storage representation. The default constructor utilizes a ConcurrentQueue. Other constructors allow a developer to choose whatever internal storage data structure fits their needs as long as the data structure implements IProducerConsumerCollection in a concurrency friendly way. .NET Framework 4.0 ships with three such collections: ConcurrentQueue, ConcurrentBag, and ConcurrentStack.

One other interesting constructor parameter is a bounding value. The Sample.WithBounding demonstrates BlockingCollection Bounding.

  Sample.WithBounding
  Code from the WithBounding sample appears below.
  int[] items = { 1, 2, 3, 4, 5, 6, 7, 8 };
  var startTime = DateTime.Now;
  
  Console.WriteLine("Starting...");
  
  var stage1 = new BlockingCollection<int>(2);
  var stage2 = new BlockingCollection<int>();
  
  //Pulls numbers from the array
  var task0 = Task.Factory.StartNew(() =>
  {
      foreach (int i in items)
      {
          stage1.Add(i);
          Console.WriteLine("Add:{0} Count={1}", i, stage1.Count);
      }
  
      Console.WriteLine("Loaded all items into stage 1...");
      stage1.CompleteAdding();
  
  });
  
  
  //Reads and passes data onto next stage
  var task1 = Task.Factory.StartNew(() =>
  {
  
      foreach (var i in stage1.GetConsumingEnumerable())
      {
          stage2.Add(i);
  
          Console.WriteLine("Stage 1 Process: {0} elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds);
  
          //Pause 2 seconds simulating work
          Thread.Sleep(new TimeSpan(0, 0, 2));
      }
      Console.WriteLine("Emptied all items from Stage 1...");
      stage2.CompleteAdding();
  });
  
  
  //Reads prints data
  var task2 = Task.Factory.StartNew(() =>
  {
      int i = -1;
      while (!stage2.IsCompleted)
      {
          try
          {
              i = stage2.Take();
          }
          catch (InvalidOperationException)
          {
              Console.WriteLine("Emptied all items from Stage 2...");
              break;
          }
          Console.WriteLine("Stage 2 Process: {0} elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds);
  
          //Pause a little over half second to simulate work
          Thread.Sleep(new TimeSpan(0, 0, 0, 0, 600));
      }
  
  });
  
  
  Task.WaitAll(task0, task1, task2);
  Console.WriteLine("Completed.  Press any key to quit...");
  Console.ReadKey();

Earlier in my manufacturing analogy I mentioned material piling up in a stage of the assembly line requiring the prior stage to halt until the pile was consumed. Bounding exists to define the size of the pile.

You'll get some interesting results when executing this sample. Steps in Task 1 and 2 will complete before Task0 finishes loading the Stage1 BlockingCollection. Stage1 BlockingCollection is constrained to containing 2 integers at a time. This means the integers must be consumed before more can be added. Thus, the Add method in Task0 halts until Task1 consumes an integer.

Bounding is also used to introduce artificial latency into the WithoutBlocking sample.

  Sample.WithoutBlocking
  Code from the WithoutBlocking sample appears below.
  int[] items = { 1, 2, 3, 4, 5, 6, 7, 8 };
  var startTime = DateTime.Now;
  
  Console.WriteLine("Starting...");
  
  var stage1 = new BlockingCollection<int>();
  var stage2 = new BlockingCollection<int>(1);//Required to force the TryAdd delay
  
  //Pulls numbers from the array
  var task0 = Task.Factory.StartNew(() =>
  {
      foreach (int i in items)
      {
          stage1.Add(i);
          Console.WriteLine("Add:{0} Count={1}", i, stage1.Count);
      }
  
      Console.WriteLine("Loaded all items into stage 1...");
      stage1.CompleteAdding();
  
  });
  
  
  //Reads and passes data onto next stage
  var task1 = Task.Factory.StartNew(() =>
  {
      int i = -1;
      while (!stage1.IsCompleted)
      {
          if (stage1.TryTake(out i,100))
          {
              Console.WriteLine("Stage 1 Process: {0} elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds);
  
              while (!stage2.TryAdd(i, new TimeSpan(0, 0, 0,0,300))) 
              { 
                  Console.WriteLine("Attempt to add {0} expired elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds); 
              }
  
              //Pause X miliseconds simulating work
              //change to as high as 2000 and low as 100
              //to see impact
              Thread.Sleep(new TimeSpan(0, 0, 0,0,2000));
          }
      }
      Console.WriteLine("Emptied all items from Stage 1...");
      stage2.CompleteAdding();
  });
  
  
  //Reads prints data
  var task2 = Task.Factory.StartNew(() =>
  {
      int i = -1;
      while (!stage2.IsCompleted)
      {
          if (stage2.TryTake(out i,300))
          {
              Console.WriteLine("Stage 2 Process: {0} elapsed {1}", i, DateTime.Now.Subtract(startTime).TotalSeconds);
              //Pause a little over half second to simulate work
              Thread.Sleep(new TimeSpan(0, 0, 0, 0, 600));
          }
          else
          {
              Console.WriteLine("Stage 2 Wait timeout exceeded at {0}",  DateTime.Now.Subtract(startTime).TotalSeconds);
          }
      }
  
  });
  
  
  Task.WaitAll(task0, task1, task2);
  Console.WriteLine("Completed.  Press any key to quit...");
  Console.ReadKey();

One of the major differences between WithoutBlocking and WithBlocking are the TryAdd and TryTake statements. Examination of the parameters reveals timeout values in milliseconds or in a defined TimeSpan class.

WithoutBlocking Tasks, block for small increments of time before continuing execution. The sample code utilizes seconds or half seconds, but given that a Timespan can be a small interval, a developer is not constrained to milliseconds. Because a method can return before the timeout interval, WithoutBlocking also excludes the try/catch code.

This is an interesting sample to tinker with. Changing the Sleep values in Task1 and Task2 has some interesting effects and really demonstrates shifting bottlenecks in code.

Conclusion

BlockingCollection serves as a mediator between Task classes that need to share data. BlockingCollection is concurrency safe and often employed in a producer/consumer pattern. A developer needing a Parallel Computing solution should look at the BlockingCollection.

Resources

Patterns for Parallel Programming: Understanding and Applying Parallel Patterns with the .NET Framework 4
ParallelExtensionsExtras Tour - #10 - Pipeline
BlockingCollection<T> Class
Parallel Computing Developer Center
Pipelines
Blocking Collection Overview

Related Articles





About the Author

Jeffrey Juday

Jeff is a software developer specializing in enterprise application integration solutions utilizing BizTalk, SharePoint, WCF, WF, and SQL Server. Jeff has been developing software with Microsoft tools for more than 15 years in a variety of industries including: military, manufacturing, financial services, management consulting, and computer security. Jeff is a Microsoft BizTalk MVP. Jeff spends his spare time with his wife Sherrill and daughter Alexandra.

Comments

  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • Live Event Date: October 29, 2014 @ 11:00 a.m. ET / 8:00 a.m. PT Are you interested in building a cognitive application using the power of IBM Watson? Need a platform that provides speed and ease for rapidly deploying this application? Join Chris Madison, Watson Solution Architect, as he walks through the process of building a Watson powered application on IBM Bluemix. Chris will talk about the new Watson Services just released on IBM bluemix, but more importantly he will do a step by step cognitive …

  • Live Event Date: October 29, 2014 @ 1:00 p.m. ET / 10:00 a.m. PT It's well understood how critical version control is for code. However, its importance to DevOps isn't always recognized. The 2014 DevOps Survey of Practice shows that one of the key predictors of DevOps success is putting all production environment artifacts into version control. In this eSeminar, Gene Kim will discuss these survey findings and will share woeful tales of artifact management gone wrong! Gene will also share examples of how …

Most Popular Programming Stories

More for Developers

Latest Developer Headlines

RSS Feeds