Understanding the Task Parallel Library IProducerConsumerCollection

An earlier article reviewed the role Task Parallel Library (TPL) BlockingCollection plays in a variety of Producer/Consumer scenarios. BlockingCollection stores and retrieves data from an underlying source that has implemented IProducerConsumerCollection.

There are a substantial number of TPL Concurrent collections implementing IProducerConsumerCollection; so most developers will have no need to implement a new Concurrent collection. Most likely a developer will implement IProducerConsumerCollection to more naturally integrate some data source with the BlockingCollection. The following paragraphs will explain the scenarios and mechanisms around implementing IProducerConsumerCollections.

Scenarios

Developers seldom implement an interface without supporting some compelling scenarios. Following are such IProducerConsumerCollection scenarios.

As stated earlier a BlockingCollection stores its data in a data structure that has implemented IProducerConsumerCollection. A developer may want to tie a BlockingCollection to some underlying infrastructure like, for example, Microsoft Message Queuing (MSMQ) or files on the file system. TPL Producer/Consumer patterns are often implemented with the BlockingCollection. To preserve the pattern semantics, but swap out the underlying sources a developer can swap between, for example, an IProducerConsumerCollection built on MSMQ or an IProducerConsumerCollection built on the file system.

These are fairly complicated scenarios not easily replicated in a short article. So, as you'll see later in the article I did something far more modest. Also, remember, implementation will only be limited by your imagination. Data has many sources. If the retrieval and add semantics can be surfaced like a collection then chances are good that the source can be an IProducerConsumerCollection.

With these potential scenarios in mind, before proceeding with an interface implementation; a BlockingCollection review is in order.

BlockingCollection Revisited

A complete BlockingCollection review is beyond the scope of this article. However the IProducerConsumerCollection implementation will be meaningless without some BlockingCollection context. Following is some BlockingCollection code.

var collection = new BlockingCollection<string>();
 
Task.Factory.StartNew(() =>
    {
        collection.Add("Val 1 BlockingCollection");
        collection.Add("Val 2 BlockingCollection");
        collection.Add("Val 3 BlockingCollection");
 
        collection.CompleteAdding();
    }
);

Console.WriteLine("Iterate BlockingCollection");
foreach (var val in collection.GetConsumingEnumerable())
{
    Console.WriteLine(val);
    Console.WriteLine("Blocking collection count " + collection.Count.ToString());
}
 

The Add method puts data into the BlockingCollection. When a developer utilizes the GetConsumingEnumerable method, data is removed from the BlockingCollection with each loop iteration.

BlockingCollection also supports Take and TryTake methods. TryTake returns false if there is no data in the underlying collection. Take blocks until data exists. Following is sample code utilizing the TryTake method on the BlockingCollection.

var collection = new BlockingCollection<string>(new ProducerConsumerCollection<string>());
 
collection.Add("Val 1 TryTakeBlockingCollection");
collection.Add("Val 2 TryTakeBlockingCollection");
collection.Add("Val 3 TryTakeBlockingCollection");
 
collection.CompleteAdding();
 
Console.WriteLine("Iterate BlockingCollection");
 
var val = "";
while (collection.TryTake(out val))
{
    Console.WriteLine(val);
}
 
 

CompleteAdding signals a consumer that data will no longer be emitted from the BlockingCollection.

A variation on the prior sample is to create a "bounded" BlockingCollection. The following line changes the prior sample to a bounded sample.

var
collection = new BlockingCollection<string>(new ProducerConsumerCollection<string>(),1);

When bounded, a BlockingCollection blocks any Adds when the boundary is reached. Taking from the bounded BlockingCollection frees space and Adds continue. A TryAdd will return false when a boundary is reached.

Properly implementing an IProducerConsumerCollection can be divided into two larger topics. The first topic centers on BlockingCollection storage and retrieval.

Interface - BlockingCollection

Following is the IProducerConsumerCollection interface contract.

    public interface IProducerConsumerCollection<T> : 
IEnumerable<T>, ICollection, IEnumerable
    {
        void CopyTo(T[] array, int index);
        bool TryAdd(T item);
        bool TryTake(out T item);
    }
 

There are two parts to the contract: a .NET collection portion and a portion that directly serves the BlockingCollection. Code directly serving the BlockingCollection appears below.

class ProducerConsumerCollection<T> : IProducerConsumerCollection<T>
{
    private IProducerConsumerCollection<T> _queue = new ConcurrentQueue<T>();
 
    public void CopyTo(T[] array, int index)
    {
        Console.WriteLine("Called CopyTo...");
        _queue.CopyTo(array, index);
    }
 
    public T[] ToArray()
    {
        Console.WriteLine("Called ToArray...");
        return _queue.ToArray();
    } 

    public bool TryAdd(T item)
    {
        Console.WriteLine("Called TryAdd " + item.ToString());
        return _queue.TryAdd(item);
    }

    public bool TryTake(out T item)
    {
        var ret = _queue.TryTake(out item);
        var val = "";

        if (item == null) { val = "NULL"; }
        else
        { val = item.ToString(); }
 

        Console.WriteLine("Called TryTake returned " + val + " returning " + ret.ToString());
 
        return ret;
    }


}
 

BlockingCollection is a concurrent friendly data structure. So, any underlying storage mechanism must be concurrent friendly. The implementation wraps the ConcurrentQueue class. As stated earlier, this article aims for simplicity. This implementation is somewhat trivial.

Notice how the implementation mimics the TryTake and TryAdd portion of the BlockingCollection class. Also notice that a Counting property is not part of the interface. As you'll see later in the article the Property is there, but the BlockingCollection handles all the counting. IProducerConsumerCollection only serves as storage. The second IProducerConsumerCollection portion supports all the standard .NET Collection interfaces.

Interface - Collection

Following is the IProducerConsumerCollection implementation supporting the standard .NET Collection interfaces.

class ProducerConsumerCollection<T> : IProducerConsumerCollection<T>
{
    private IProducerConsumerCollection<T> _queue = new ConcurrentQueue<T>();
 
    public IEnumerator<T> GetEnumerator()
    {
        Console.WriteLine("GetEnumerator T..");
        return _queue.GetEnumerator();
    }
 
    IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        Console.WriteLine("GetEnumerator..");
        return _queue.GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        _queue.CopyTo(array, index);
    }
 
    public int Count
    {
        get 
        {
            Console.WriteLine("Called get on Count...");
            return _queue.Count; 
        }
    }

    public bool IsSynchronized
    {
        get { return _queue.IsSynchronized; }
    }

    public object SyncRoot
    {
        get { return _queue.SyncRoot; }
    }

}
 

Supporting all the standard .NET Collection interfaces means that things like the standard Extension functions and LINQ are automatically supported. Data can be moved into and out of the collection using, for example, a LINQ Select expression. This also means that TPL Data Parallel mechanisms can consume the collection. Data can also be moved and copied to Arrays like in the following sample.

ProducerConsumerCollection<string> _prodConsumer = new ProducerConsumerCollection<string>();
 
_prodConsumer.TryAdd("Val 1 Array");
_prodConsumer.TryAdd("Val 2 Array");
_prodConsumer.TryAdd("Val 3 Array");
 
var array = _prodConsumer.ToArray();
 
for (int n = 0; n < array.Length; ++n)
{
    Console.WriteLine(array[n]);
}
 

Of course implementing any interface simply means that the method exists on the class. A method can be implemented, but throw an exception if support makes no sense for the underlying storage mechanism.

Conclusion

The BlockingCollection class supports TPL Producer/Consumer pattern scenarios. BlockingCollection stores its data in collections that implement the IProducerConsumerCollection interface. Though IProducerConsumerCollection implementations exist for all the common storage data structures; there are situations requiring a custom IProducerConsumerCollection implementation.



About the Author

Jeffrey Juday

Jeff is a software developer specializing in enterprise application integration solutions utilizing BizTalk, SharePoint, WCF, WF, and SQL Server. Jeff has been developing software with Microsoft tools for more than 15 years in a variety of industries including: military, manufacturing, financial services, management consulting, and computer security. Jeff is a Microsoft BizTalk MVP. Jeff spends his spare time with his wife Sherrill and daughter Alexandra.

Related Articles

Comments

  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • Where the business performance of their mobile app portfolios are concerned, most companies are flying blind. While traditional application portfolios are held to all kinds of ROI measure, the investment plan for mobile apps -- increasingly the more crucial bet -- is made by guesswork and dart-throwing. This interactive e-book investigates how mobile is driving the need for app and portfolio measures unlike any we saw in the days of web. Good mobile analytics must deliver leading indicators of user experience …

  • Savvy enterprises are discovering that the cloud holds the power to transform IT processes and support business objectives. IT departments can use the cloud to redefine the continuum of development and operations—a process that is becoming known as DevOps. Download the Executive Brief DevOps: Why IT Operations Managers Should Care About the Cloud—prepared by Frost & Sullivan and sponsored by IBM—to learn how IBM SmartCloud Application services provide a robust platform that streamlines …

Most Popular Programming Stories

More for Developers

Latest Developer Headlines

RSS Feeds