Simple Parallel Development with the Asynchronous Agents Library

The Asynchronous Agents Library is the final block of parallel-programming functionality that will ship in Visual C++ 2010. Previous articles in this series covered the parallel programming library (PPL) and the state management functionality that supports PPL. The PPL supports parallel task execution by automatically moving discrete processing tasks out onto separate threads based on the processing resources that are available at runtime, and achieves synchronization by traditional methods like data locks and events. In contrast, the Asynchronous Agents Library (also referred to by the shorthand name of the “Agents Library”) adopts a more formal approach to achieving parallelism by dividing processing tasks into discrete “agents” that each have a single, well-defined task and operate on a dedicated memory buffer. Once an agent has finished processing a piece of incoming data, it can then pass the result to an outgoing memory buffer where it may be processed in a different way by a different agent.

Agents communicate with each other entirely through the memory buffers that they are operating on, and use Agent Library-provided functions for dealing with these buffers. By using this well-defined communication mechanism, which guarantees that only one agent is operating on a particular buffer, the Agent Library runtime is free from the need to synchronize access to memory blocks, and can schedule the execution of tasks across any free operating system thread.

The first step in developing an Agent-based solution is identifying the discrete agents. Imagine a problem domain where data needs to be collected from some input device such as an open port or file from disk. The data needs to have a computationally expensive transform applied, and finally, the transformed data needs to be processed into a state ready for final display to the end user. Three agents can easily be identified—data collection, data processing, and data collation for final display. It is also possible to identify two memory buffers that will be required—one to hold the collected data, and one to hold the processed data. Figure 1 illustrates this solution.

Writing an agent using the Agents Library is trivial—you need to derive a class from agent, accept the required processing buffers via a constructor or setter method, and override the run method of agent to implement the agent-specific processing logic. The data collection agent only needs access to an output buffer, so it can be written as:

class DataCollector: public agent
{
public:
 explicit DataCollector(
    ITarget<int>& target) : _target(target)
 {}
protected:
 void run()
 {
  for (int ix = 1; ix <= 10; ++ix){
   send(_target, ix);
  }
  send(_target, -1);
  done(agent_done);
 }
private:
 ITarget<int>& _target;
};

In this case, data is being “collected” only from a for loop, but the actual collection mechanisms are irrelevant from the overall solution perspective. A few calls in the run method are worth highlighting—data is sent to the target buffer using the send function from the Agents library, and the end of processing is signaled by a call to the done function. A value of -1 is sent to the buffer to indicate that all collection is complete. This value is then used as a signal to the next agent that no more processing is required.

The processor agent is slightly more complex—it needs a source buffer with the collected data and a target buffer to write the results of its processing. As with the collection agent, it uses a magic value of -1 to signal to the next agent that there is no more data to process. The agent’s processing is fairly simple; it reads the value from the source buffer, squares it, and writes the result to the target buffer.

class DataProcessor: public agent
{
public:
 explicit DataProcessor(ISource<int>& source, ITarget<int>& target)
  : _source(source), _target(target)
 {}
protected:
 void run()
 {
  int dataToProcess;
  while (-1 != (dataToProcess = receive(_source))){
   send(_target, dataToProcess * dataToProcess);
  }
  send(_target, -1);
  done(agent_done);
 }
private:
 ISource<int>& _source;
 ITarget<int>& _target;
};

The final agent is responsible for summing the data from the processor agent and storing it so it can be accessed by the client of the agents. The collator agent has only a source buffer, and it writes its output to a member variable rather than a buffer:

class DataCollator: public agent
{
public:
 explicit DataCollator(ISource<int>& source)
  : _source(source)
 {}
 int sum;
protected:
 void run()
 {
  printf("DataCollator run\n");
  int dataToProcess;
  sum = 0;
  while (-1 != (dataToProcess = receive(_source))){
   sum += dataToProcess;
  }
  done(agent_done);
 }
private:
 ISource<int>& _source;
};

Using the agents is a simple undertaking. The agents require two buffers; the Agents library provides a number of buffer types that can be used for this purpose. This example uses the templatized unbounded_buffer class to hold the interim results of the agents’ work. After that, all you need to do is declare instances of the agents, start them up, and wait on the result of the collection agent using the agent::wait call from the agents library.

unbounded_buffer<int> collectionBuffer;
unbounded_buffer<int> processingBuffer;
DataCollector collector(collectionBuffer);
DataProcessor processor(collectionBuffer, processingBuffer);
DataCollator collator(processingBuffer);
processor.start();
collator.start();
collector.start();
agent::wait(&collator);
printf("Final result %i\n", collator.sum);

When you call start on an agent, the runtime will call the agent’s run method (which does not need to be public). The run method will sit in an infinite loop waiting for either data to process or for the end of a data stream to be reached. The run method is written in such a way that it looks like it is blocking the thread that is executing it, and both the data processor and data collator in the example will block at a receive call until there is data in the buffer to process. Because the Agent runtime knows what each agent is doing by observing the calls it is blocked on, it can intelligently schedule which agents are receiving processing time by looking at the state of the agent and the amount of data in its processing queue.

The Agents library has a full range of functions and message blocks to support a more complex agent scenario than demonstrated above; all these features are covered in the MSDN documentation. However, regardless of the complexity of the agent, the key design goal of any agent-based solution is the same: identify the discrete processing agents at a granular enough level so that the Agent runtime can achieve parallel scheduling by getting one agent working on the data produced by another agent while this ‘upstream’ agent is still conducting data collection or data processing of its own. With this design achieved, data can ‘flow’ across a series of memory buffers in parallel with minimal problems like dead-locking and memory contention that often adversely impact traditional multi-threaded development.

About the Author

Nick Wienholt is an independent Windows and .NET consultant based in Sydney. He is the author of Maximizing .NET Performance and co-author of A Programmers Introduction to C# 2.0 from APress, and specializes in system-level software architecture and development, with a particular focus on performance, security, interoperability, and debugging.

Nick is a keen and active participant in the .NET community. He is the co-founder of the Sydney Deep .NET User group, and writes technical articles for Australian Developer Journal, ZDNet, Pinnacle Publishing, CodeGuru, MSDN Magazine (Australia and New Zealand Edition), and the Microsoft Developer Network. In recognition of his work in the .NET area, Nick was awarded the Microsoft Most Valued Professional Award from 2002 through 2007.

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read