Extending Microsoft’s Concurrency and Coordination Runtime with MSMQ

Concurrency and Coordination Runtime (CCR) is part of the CCR and DSS Toolkit. If you have an MSDN license you can download the CCR and DSS Toolkit. CCR was originally built for coordinating and prioritizing workloads in Robotics software.


My earlier CCR article introduced CCR, and demonstrated how CCR can be an effective option for the .NET Framework Asynchronous Programming Model (APM). I shared some of the basic CCR classes and explained how CCR could be employed in lieu of the .NET Framework ThreadPool and synchronization classes.


This article explores a more advanced scenario utilizing Microsoft Message Queuing (MSMQ) and CCR to coordinate work across processes on multiple computers.


CCR Overview


CCR works a lot like the ThreadPool; developers queue delegate functions to CCR, and CCR manages the execution of a delegate on a set of threads.


You’ll find a more complete introduction to CCR in my prior article, but as a brief review, here are the major CCR classes (see Figure 1).


Figure 1. CCR Classes:
The figure shows the main CCR classes.

The Dispatcher class maintains a group of threads and a collection of DispatcherQueues. Dispatchers operate on their Dispatcher Queues in round-robin style, executing a delegate function from the DispatcherQueue on an available thread. Typically, a Dispatcher is configured to create a thread for each core on the hosting computer.


Arbiter and Port are the other main CCR classes. A developer works through an Arbiter class to configure how a delegate function is executed by the Dispatcher. Through an Arbiter, for example, a developer can configure a function to run exclusively, thus addressing the shared-state aspect of concurrency. Arbiter methods also bind operations to a particular Dispatcher Queue.


Port is a sort of entry point in the CCR. After a developer has created a Dispatcher, Dispatcher Queues, and enlisted an Arbiter to lay out how a delegate should be executed, everything is initiated through a Port. Ports are generic classes. The Port Post method receives a predefined class that, in turn, initiates the whole process.


Queues Everywhere


Combining CCR with MSMQ may not be an obvious match, so I thought I should share a bit about how I arrived at the approach.


The sample arose from a search for a Lightweight BizTalk-like application to coordinate some integration work across some Windows machines. The integration work consisted of basic operations such as moving files when they appear in a directory, and propagating information across some databases. BizTalk would have worked for most of these cases, but it would have been overkill, and we didn’t want to invest in another BizTalk license. Essentially I needed a stripped-down version of the BizTalk Messaging Engine-without SQL Server and other parts of BizTalk. I also needed to build something fairly quickly.


The concept I envisioned would work as shown in Figure 2.



Figure 2. Application Concept:
This concept serves to provide a lightweight integration with MSMQ.

Naturally, I looked at WCF and WF, but wanted something lighter than WF. The work arrangements I was looking at were simple and I needed something I could configure, program, and execute quickly. WCF and MSMQ seemed like a logical choice. I’ve always thought of the BizTalk Messaging Engine as, among other things, a host for multiple queues. In some cases I needed a message to survive a reboot, and in other cases I needed unread messages to automatically disappear after a short duration. MSMQ handles all these scenarios. MSMQ’s message size limitations were not a factor.


Then I started looking at performance, throughput, message prioritization, timed responses, aggregating execution results, and controlling a workload. I decided I would need something multi-threaded, and thought I’d be using .NET signaling and locking. To summarize, I needed to handle concurrency using the .NET Framework.


I found CCR while doing a search on concurrency and .NET Framework options. Robots need mechanisms for managing tasks and prioritizing a workload. Robots must also be responsive. In essence, what I needed to do was somewhat robotic, only, rather than coordinating servomotors, I needed to coordinate Windows services. All the components seemed like a natural fit. I settled on CCR, but there was still one other technology decision to make.


MSMQ WCF or System.Messaging


There are two development approaches to MSMQ in the .NET Framework: WCF MSMQ Bindings, and the classes in the System.Messaging namespace. By design, the WCF MSMQ Bindings hide much of MSMQ from the developer, which is great when you want interchangeable bindings, but not good if you want more granular control over MSMQ. I needed the extra control.


Below is an overview of the main System.Messaging classes utilized in the sample.



  • The MessageQueue class controls access to MSMQ queues. I leveraged the APM functions BeginPeek, BeginReceive, EndPeek, and EndReceive in the sample. All these functions were thread safe. The Send function handles transmitting messages to a queue. The MessageQueue class also handles queue creation.
  • The Message class contains the message payload and exposes delivery options. For example, Message can handle messages that should survive a reboot and messages that should disappear from the queue if they have not been delivered after some duration.

MSMQ also works across computers. Messages sent to public queues on another computer can be configured for guaranteed delivery.


At this point I’ve introduced MSMQ and explained why I coupled MSMQ to CCR. Now I’m going to explain how the sample works.


MSMQ Gateway


Below is the entry point for the CCR_MSMQ console application.

static void Main(string[] args)
{
Dispatcher dispatcher = new Dispatcher(0, “Test pool”);
DispatcherQueue queue = new DispatcherQueue(
“Test queue”, dispatcher);
MSMQGateway gateway = new MSMQGateway(queue);
Port<MessageQueueMediatorVisitor> port = new
Port<MessageQueueMediatorVisitor>();
Arbiter.Activate(queue,
Arbiter.Receive(true, port, GotMessage)
);
gateway.AddQueueIn(@”.\testdurabledispin”, port);
_port = gateway.AddQueueOut(@”.\testdurabledispout”);
gateway.StartProcessMessages();
Console.WriteLine(“Ready…”);
}

The most interesting part of this application is the MSMQGateway class. MSMQGateway relates the MSMQ Queues to CCR Ports. AddQueueIn ties a port to a Queue for incoming messages and AddQueueOut returns a Port for outgoing messages. As you may have observed, interacting with MSMQ is completely abstracted away by MSMQGateway, replaced by interaction with the CCR.


Ordinarily CCR can only be leveraged from within an application. With connectivity to MSMQ CCR can be leveraged across processes and across processes on other computers. So for example, CCR supports something called a Join. A Join will execute a delegate as long as all the Ports in the Join have posted messages. With MSMQ in the mix along with a CCR Join, an application could create a Joined response across three different computers and only execute a piece of code when all three responses have been received.


Next, you’ll see how Ports get tied to Queues, starting with incoming messages.

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read