Extending Microsoft's Concurrency and Coordination Runtime with MSMQ

Concurrency and Coordination Runtime (CCR) is part of the CCR and DSS Toolkit. If you have an MSDN license you can download the CCR and DSS Toolkit. CCR was originally built for coordinating and prioritizing workloads in Robotics software.

My earlier CCR article introduced CCR, and demonstrated how CCR can be an effective option for the .NET Framework Asynchronous Programming Model (APM). I shared some of the basic CCR classes and explained how CCR could be employed in lieu of the .NET Framework ThreadPool and synchronization classes.

This article explores a more advanced scenario utilizing Microsoft Message Queuing (MSMQ) and CCR to coordinate work across processes on multiple computers.

CCR Overview

CCR works a lot like the ThreadPool; developers queue delegate functions to CCR, and CCR manages the execution of a delegate on a set of threads.

You'll find a more complete introduction to CCR in my prior article, but as a brief review, here are the major CCR classes (see Figure 1).

Figure 1. CCR Classes:
The figure shows the main CCR classes.

The Dispatcher class maintains a group of threads and a collection of DispatcherQueues. Dispatchers operate on their Dispatcher Queues in round-robin style, executing a delegate function from the DispatcherQueue on an available thread. Typically, a Dispatcher is configured to create a thread for each core on the hosting computer.

Arbiter and Port are the other main CCR classes. A developer works through an Arbiter class to configure how a delegate function is executed by the Dispatcher. Through an Arbiter, for example, a developer can configure a function to run exclusively, thus addressing the shared-state aspect of concurrency. Arbiter methods also bind operations to a particular Dispatcher Queue.

Port is a sort of entry point in the CCR. After a developer has created a Dispatcher, Dispatcher Queues, and enlisted an Arbiter to lay out how a delegate should be executed, everything is initiated through a Port. Ports are generic classes. The Port Post method receives a predefined class that, in turn, initiates the whole process.

Queues Everywhere

Combining CCR with MSMQ may not be an obvious match, so I thought I should share a bit about how I arrived at the approach.

The sample arose from a search for a Lightweight BizTalk-like application to coordinate some integration work across some Windows machines. The integration work consisted of basic operations such as moving files when they appear in a directory, and propagating information across some databases. BizTalk would have worked for most of these cases, but it would have been overkill, and we didn't want to invest in another BizTalk license. Essentially I needed a stripped-down version of the BizTalk Messaging Engine-without SQL Server and other parts of BizTalk. I also needed to build something fairly quickly.

The concept I envisioned would work as shown in Figure 2.

Figure 2. Application Concept:
This concept serves to provide a lightweight integration with MSMQ.

Naturally, I looked at WCF and WF, but wanted something lighter than WF. The work arrangements I was looking at were simple and I needed something I could configure, program, and execute quickly. WCF and MSMQ seemed like a logical choice. I've always thought of the BizTalk Messaging Engine as, among other things, a host for multiple queues. In some cases I needed a message to survive a reboot, and in other cases I needed unread messages to automatically disappear after a short duration. MSMQ handles all these scenarios. MSMQ's message size limitations were not a factor.

Then I started looking at performance, throughput, message prioritization, timed responses, aggregating execution results, and controlling a workload. I decided I would need something multi-threaded, and thought I'd be using .NET signaling and locking. To summarize, I needed to handle concurrency using the .NET Framework.

I found CCR while doing a search on concurrency and .NET Framework options. Robots need mechanisms for managing tasks and prioritizing a workload. Robots must also be responsive. In essence, what I needed to do was somewhat robotic, only, rather than coordinating servomotors, I needed to coordinate Windows services. All the components seemed like a natural fit. I settled on CCR, but there was still one other technology decision to make.

MSMQ WCF or System.Messaging

There are two development approaches to MSMQ in the .NET Framework: WCF MSMQ Bindings, and the classes in the System.Messaging namespace. By design, the WCF MSMQ Bindings hide much of MSMQ from the developer, which is great when you want interchangeable bindings, but not good if you want more granular control over MSMQ. I needed the extra control.

Below is an overview of the main System.Messaging classes utilized in the sample.

  • The MessageQueue class controls access to MSMQ queues. I leveraged the APM functions BeginPeek, BeginReceive, EndPeek, and EndReceive in the sample. All these functions were thread safe. The Send function handles transmitting messages to a queue. The MessageQueue class also handles queue creation.
  • The Message class contains the message payload and exposes delivery options. For example, Message can handle messages that should survive a reboot and messages that should disappear from the queue if they have not been delivered after some duration.

MSMQ also works across computers. Messages sent to public queues on another computer can be configured for guaranteed delivery.

At this point I've introduced MSMQ and explained why I coupled MSMQ to CCR. Now I'm going to explain how the sample works.

MSMQ Gateway

Below is the entry point for the CCR_MSMQ console application.

static void Main(string[] args)
    Dispatcher dispatcher = new Dispatcher(0, "Test pool");
    DispatcherQueue queue = new DispatcherQueue(
       "Test queue", dispatcher);
    MSMQGateway gateway = new MSMQGateway(queue);
    Port<MessageQueueMediatorVisitor> port = new 
        Arbiter.Receive(true, port, GotMessage)
    gateway.AddQueueIn(@".\testdurabledispin", port);
    _port = gateway.AddQueueOut(@".\testdurabledispout");

The most interesting part of this application is the MSMQGateway class. MSMQGateway relates the MSMQ Queues to CCR Ports. AddQueueIn ties a port to a Queue for incoming messages and AddQueueOut returns a Port for outgoing messages. As you may have observed, interacting with MSMQ is completely abstracted away by MSMQGateway, replaced by interaction with the CCR.

Ordinarily CCR can only be leveraged from within an application. With connectivity to MSMQ CCR can be leveraged across processes and across processes on other computers. So for example, CCR supports something called a Join. A Join will execute a delegate as long as all the Ports in the Join have posted messages. With MSMQ in the mix along with a CCR Join, an application could create a Joined response across three different computers and only execute a piece of code when all three responses have been received.

Next, you'll see how Ports get tied to Queues, starting with incoming messages.

Extending Microsoft's Concurrency and Coordination Runtime with MSMQ

Incoming Messages

The AddQueueIn function on the MSMQGateway class appears below.

public void AddQueueIn(string pathToQueue, 
   Port<MessageQueueMediatorVisitor> port)
    //This ties everything together, relates the MessageQueue to a
    //particular port
    MessageQueue queue = null;
    MessageQueueMediator mediator;
    queue = MSMQHelper.GetQ(pathToQueue);
    mediator = new MessageQueueMediator(queue);
    queue.PeekCompleted += AsyncMSMQCallBackFactory.Create(
       port, mediator);

Most of the code stores all data supporting MSMQ and the CCR. The AsyncMSMQCallBackFactory requires a Create method:

public static PeekCompletedEventHandler Create(
   Port<MessageQueueMediatorVisitor> portResult,
   MessageQueueMediator mediator)
   AsyncCallbackMethodAdapter builder =
      new AsyncCallbackMethodAdapter(portResult,mediator);
   return builder.PeekCompletedCallBack;
//This class wires the ports to the callback method
private sealed class AsyncCallbackMethodAdapter
   private Port<MessageQueueMediatorVisitor> _portResultPeek;
   private MessageQueueMediator _mediator = null;
   internal AsyncCallbackMethodAdapter(
      Port<MessageQueueMediatorVisitor> portResult, 
      MessageQueueMediator mediator)
      _portResultPeek = portResult;
      _mediator = mediator;

Create returns a function matching the PeekCompletedCallBack delegate. There are two ways to read messages from a MessageQueue. Peeking, getting the message without removing it, and Receiving, getting the message and receiving it from the queue. Doing a Peek and posting the Peeked message to the port ensures that exceptions during message handling don't result in message loss. I'll explain how the message is removed in a moment. The PeekCompletedEventHandler function returned by Create with exception handling code removed appears below.

internal void PeekCompletedCallBack(
   Object source, PeekCompletedEventArgs asyncResult)
      // Connect to the queue.
      MessageMapper mapper = null;
      MessageQueue mq = (MessageQueue)source;
      // EndPeek completes and returns the message.
      Message m = mq.EndPeek(asyncResult.AsyncResult);
      mapper = new MessageMapper(m);
      Console.WriteLine("PeekCompletedCallBack " + 
         mapper.Body.MessageId + " " + mapper.Body.CreatedDateTime);
      MessageQueueMediatorVisitor visitor = new 
         MessageQueueMediatorVisitor(_mediator, mapper.Body);
   catch (MessageQueueException ex)

As you'll see throughout the sample application, a MessageMapper class serializes and deserializes into and out of the MessageBody class. To maximize flexibility I opted to handle my own serialization. So MessageMapper utilized the BodyStream properties on the message. The MessageMapper Body property appears below.

public MessageBody Body
      MessageBody body;
      body = MSMQHelper.DeSerializeBody(
      return body;
      Stream stream = null;
      stream = MSMQHelper.SerializeBody(value);
      _message.BodyStream = stream;

MessageQueueMediator handles MessageQueues on behalf of the MSMQGateway. Typically a class like MessageQueueMediator would be internal and therefore hidden inside an assembly. Somehow, a delegate processing a MessageBody class must indicate that the message has been successfully processed and the MessageQueueMediator can move on the next message. MessageQueueMediatorVisitor packages the MessageBody class and controls access to the MessageQueueMediator class.

As you can see in the code below, the MessageQueueMediatorVisitor's CompleteReceive function indicates to the MessageQueueMediator that the message has been processed. MessageQueueMediator calls the MessageQueue class's Receive function and then starts another Peek.

public void CompleteReceive()
public void ReceiveCompleted()
   Console.WriteLine("From Mediator ReceiveCompleted ");
   _queue.Receive();//clear the message out
public void ContinueWithMessages()

Receiving messages is a bit complicated, sending is much easier.


The MSMQGateway.AddQueueOut function appears below.

public Port<MessageBody> AddQueueOut(string pathToQueue)
   //This ties everything together, 
   //relates the MessageQueue to a
   //particular port, with a handler 
   //supplied by a component
   //inside of the application
   MessageQueue queue = null;
   Port<MessageBody> port = new Port<MessageBody>();
   MessageQueueMediator mediator;
   queue = MSMQHelper.GetQ(pathToQueue);
   mediator = new MessageQueueMediator(queue);
      Arbiter.Receive(true, port, mediator.SendMessage)
    return port;

The code is very similar to AddQueueIn except that it configures a port for sending a message.

MessageQueueMediator also handles sending a message. The SendMessage function appears below.

public void SendMessage(MessageBody body)
   System.Messaging.Message msg = new Message();
   MessageMapper mapper = new MessageMapper(msg);
   mapper.Body = body;
   Console.WriteLine("From Mediator Sending Message " + 
      body.MessageId + " " + 

Like a receive operation, sending leverages the MessageMapper class. MessageMapper serializes the message so its bytes can be inserted into the BodyStream of the Message class.


Originally built for Robotics, CCR is a library for coordinating tasks and handling concurrency in .NET Framework applications. CCR's capabilities can be extended to coordinate processes across multiple machines, but doing so requires a distributed transport. Fortunately, MSMQ makes a fine transport choice.


About the Author

Jeffrey Juday

Jeff is a software developer specializing in enterprise application integration solutions utilizing BizTalk, SharePoint, WCF, WF, and SQL Server. Jeff has been developing software with Microsoft tools for more than 15 years in a variety of industries including: military, manufacturing, financial services, management consulting, and computer security. Jeff is a Microsoft BizTalk MVP. Jeff spends his spare time with his wife Sherrill and daughter Alexandra.



  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • Where the business performance of their mobile app portfolios are concerned, most companies are flying blind. While traditional application portfolios are held to all kinds of ROI measure, the investment plan for mobile apps -- increasingly the more crucial bet -- is made by guesswork and dart-throwing. This interactive e-book investigates how mobile is driving the need for app and portfolio measures unlike any we saw in the days of web. Good mobile analytics must deliver leading indicators of user experience …

  • Is your sales and operations planning helping or hurting your bottom line? Here are 5 useful tips from the experts at Quintiq to guide you to a better S&OP strategy.

Most Popular Programming Stories

More for Developers

Latest Developer Headlines

RSS Feeds