An earlier article reviewed the role Task
Parallel Library (TPL) BlockingCollection plays in a variety of
Producer/Consumer scenarios. BlockingCollection stores and retrieves data from
an underlying source that has implemented IProducerConsumerCollection.
There are a substantial number of TPL Concurrent
collections implementing IProducerConsumerCollection; so most developers will
have no need to implement a new Concurrent collection. Most likely a developer
will implement IProducerConsumerCollection to more naturally integrate some
data source with the BlockingCollection. The following paragraphs will explain
the scenarios and mechanisms around implementing IProducerConsumerCollections.
Scenarios
Developers seldom implement an interface without supporting
some compelling scenarios. Following are such IProducerConsumerCollection scenarios.
As stated earlier a BlockingCollection stores its data in a
data structure that has implemented IProducerConsumerCollection. A developer
may want to tie a BlockingCollection to some underlying infrastructure like,
for example, Microsoft
Message Queuing (MSMQ) or files on the file system. TPL Producer/Consumer
patterns are often implemented with the BlockingCollection. To preserve the
pattern semantics, but swap out the underlying sources a developer can swap
between, for example, an IProducerConsumerCollection built on MSMQ or an
IProducerConsumerCollection built on the file system.
These are fairly complicated scenarios not easily replicated
in a short article. So, as you’ll see later in the article I did something far
more modest. Also, remember, implementation will only be limited by your
imagination. Data has many sources. If the retrieval and add semantics can be
surfaced like a collection then chances are good that the source can be an
IProducerConsumerCollection.
With these potential scenarios in mind, before proceeding
with an interface implementation; a BlockingCollection review is in order.
BlockingCollection Revisited
A complete BlockingCollection review is beyond the scope of
this article. However the IProducerConsumerCollection implementation will be
meaningless without some BlockingCollection context. Following is some
BlockingCollection code.
var collection = new BlockingCollection<string>(); Task.Factory.StartNew(() => { collection.Add("Val 1 BlockingCollection"); collection.Add("Val 2 BlockingCollection"); collection.Add("Val 3 BlockingCollection"); collection.CompleteAdding(); } ); Console.WriteLine("Iterate BlockingCollection"); foreach (var val in collection.GetConsumingEnumerable()) { Console.WriteLine(val); Console.WriteLine("Blocking collection count " + collection.Count.ToString()); }
The Add method puts data into the BlockingCollection. When
a developer utilizes the GetConsumingEnumerable method, data is removed from
the BlockingCollection with each loop iteration.
BlockingCollection also supports Take and TryTake methods. TryTake returns false if there is no data in the underlying collection. Take
blocks until data exists. Following is sample code utilizing the TryTake
method on the BlockingCollection.
var collection = new BlockingCollection<string>(new ProducerConsumerCollection<string>()); collection.Add("Val 1 TryTakeBlockingCollection"); collection.Add("Val 2 TryTakeBlockingCollection"); collection.Add("Val 3 TryTakeBlockingCollection"); collection.CompleteAdding(); Console.WriteLine("Iterate BlockingCollection"); var val = ""; while (collection.TryTake(out val)) { Console.WriteLine(val); }
CompleteAdding signals a consumer that data will no longer
be emitted from the BlockingCollection.
A variation on the prior sample is to create a "bounded"
BlockingCollection. The following line changes the prior sample to a bounded
sample.
var collection = new BlockingCollection<string>(new ProducerConsumerCollection<string>(),1);
When bounded, a BlockingCollection blocks any Adds when the
boundary is reached. Taking from the bounded BlockingCollection frees space
and Adds continue. A TryAdd will return false when a boundary is reached.
Properly implementing an IProducerConsumerCollection can be
divided into two larger topics. The first topic centers on BlockingCollection
storage and retrieval.
Interface – BlockingCollection
Following is the IProducerConsumerCollection interface
contract.
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable { void CopyTo(T[] array, int index); bool TryAdd(T item); bool TryTake(out T item); }
There are two parts to the contract: a .NET collection
portion and a portion that directly serves the BlockingCollection. Code
directly serving the BlockingCollection appears below.
class ProducerConsumerCollection<T> : IProducerConsumerCollection<T> { private IProducerConsumerCollection<T> _queue = new ConcurrentQueue<T>(); public void CopyTo(T[] array, int index) { Console.WriteLine("Called CopyTo..."); _queue.CopyTo(array, index); } public T[] ToArray() { Console.WriteLine("Called ToArray..."); return _queue.ToArray(); } public bool TryAdd(T item) { Console.WriteLine("Called TryAdd " + item.ToString()); return _queue.TryAdd(item); } public bool TryTake(out T item) { var ret = _queue.TryTake(out item); var val = ""; if (item == null) { val = "NULL"; } else { val = item.ToString(); } Console.WriteLine("Called TryTake returned " + val + " returning " + ret.ToString()); return ret; } }
BlockingCollection is a concurrent friendly data structure. So, any underlying storage mechanism must be concurrent friendly. The
implementation wraps the ConcurrentQueue class. As stated earlier, this
article aims for simplicity. This implementation is somewhat trivial.
Notice how the implementation mimics the TryTake and TryAdd
portion of the BlockingCollection class. Also notice that a Counting property
is not part of the interface. As you’ll see later in the article the Property
is there, but the BlockingCollection handles all the counting. IProducerConsumerCollection only serves as storage. The second
IProducerConsumerCollection portion supports all the standard .NET Collection
interfaces.
Interface – Collection
Following is the IProducerConsumerCollection implementation
supporting the standard .NET Collection interfaces.
class ProducerConsumerCollection<T> : IProducerConsumerCollection<T> { private IProducerConsumerCollection<T> _queue = new ConcurrentQueue<T>(); public IEnumerator<T> GetEnumerator() { Console.WriteLine("GetEnumerator T.."); return _queue.GetEnumerator(); } IEnumerator System.Collections.IEnumerable.GetEnumerator() { Console.WriteLine("GetEnumerator.."); return _queue.GetEnumerator(); } public void CopyTo(Array array, int index) { _queue.CopyTo(array, index); } public int Count { get { Console.WriteLine("Called get on Count..."); return _queue.Count; } } public bool IsSynchronized { get { return _queue.IsSynchronized; } } public object SyncRoot { get { return _queue.SyncRoot; } } }
Supporting all the standard .NET Collection interfaces means
that things like the standard Extension functions and LINQ are automatically supported. Data can be moved into and out of the collection using, for example, a LINQ
Select expression. This also means that TPL
Data Parallel mechanisms can consume the collection. Data can also be
moved and copied to Arrays like in the following sample.
ProducerConsumerCollection<string> _prodConsumer = new ProducerConsumerCollection<string>(); _prodConsumer.TryAdd("Val 1 Array"); _prodConsumer.TryAdd("Val 2 Array"); _prodConsumer.TryAdd("Val 3 Array"); var array = _prodConsumer.ToArray(); for (int n = 0; n < array.Length; ++n) { Console.WriteLine(array[n]); }
Of course implementing any interface simply means that the
method exists on the class. A method can be implemented, but throw an
exception if support makes no sense for the underlying storage mechanism.
Conclusion
The BlockingCollection class supports TPL Producer/Consumer
pattern scenarios. BlockingCollection stores its data in collections that
implement the IProducerConsumerCollection interface. Though
IProducerConsumerCollection implementations exist for all the common storage
data structures; there are situations requiring a custom IProducerConsumerCollection
implementation.