Batched Execution Using the .NET Thread Pool

Most articles about areas of functionality such as the System.Threading namespace describe the individual types within the namespace but don’t pay much attention to how the various types can be used together to form a real-world method or class. This article takes a different approach and fits the pieces together to form some production-ready code that you can use as-is in an application. (For an introduction to the functionality the threads and the thread pool in .NET offer, see Paul Kimmel’s Asynchronous Programming with Thread Pools.)

Most applications written today require concurrent execution of various tasks. For simple tasks that can be run in isolation, the .NET thread pool provides a simple programming model: Write a method whose signature matches the WaitCallback delegate, create a delegate instance, and pass the instance to the ThreadPool’s static QueueUserWorkItem method:

__gc class Simple{

   static void AsyncTask(Object* stateInfo){
   Console::WriteLine(S"Task completed on ThreadPool thread");
}

public:
   static void UseThreadPool(){
          ThreadPool::QueueUserWorkItem(new WaitCallback(0,
          Simple::AsyncTask));
   }
};

For tasks that depend on the completion of other tasks, simply queuing the tasks in the order that they should be executed is not enough. The ThreadPool does not guarantee that tasks will be executed in the order that they are queued or that subsequent tasks will begin execution only after previously queued tasks have completed. To accomplish in-order queued execution using the ThreadPool, tasks need to be queued after their dependent tasks have completed. The simplest way to accomplish this is to schedule the dependent tasks after the previous tasks have been completed. Using the System.Collections.Queue type makes this task easy:

__gc class Queued{
   Queue __gc* _taskQueue;

   void Task1(Object* stateInfo){
      Console::WriteLine(S"Task1");
      RunNextTask();
   }

   void Task2(Object* stateInfo){
      Console::WriteLine(S"Task2");
      RunNextTask();
   }

   void Task3(Object* stateInfo){
      Console::WriteLine(S"Task3");
      RunNextTask();
   }

   //schedule next task is available
   void RunNextTask(){
      if (_taskQueue->Count != 0){
      ThreadPool::QueueUserWorkItem(__try_cast<WaitCallback __gc*>
      (_taskQueue->Dequeue()));
      }
   }

   public:
   void UseThreadPool(){
      _taskQueue = new Queue();
      //queue
      _taskQueue->Enqueue(new WaitCallback(this, Task1));
      _taskQueue->Enqueue(new WaitCallback(this, Task2));
      _taskQueue->Enqueue(new WaitCallback(this, Task3));

      //run task 1
      RunNextTask();
   }
};

When a task is dependent on only one previous task, and in turn has a single task dependent on it, using the ThreadPool and Queue types (as demonstrated in the previous example) works well. However, when multiple tasks can run simultaneously, but all of these tasks must complete before the next group of tasks start, a more advanced synchronization mechanism is required. The naove approach is to simply maintain a boolean flag for each separate task, and poll each task flag until all are set. The problem with polling is that it wastes processor resources if the check for completion is not successful, and the time lapse between the point when all the tasks are complete and when the next polling takes place also slows overall execution speed.

Specialized threading types—AutoResetEvent and ManualResetEvent—exist to help in situations like this. These event types exist solely to allow communication between threads. The typical pattern of use is that one thread will wait on one or more events, which will become signaled by one or more other threads. Rather than polling for these events to become signaled, the WaitOne, WaitAny, and WaitAll methods of the WaitHandle type are used to block the waiting thread until the events become signaled. As the method names suggest, WaitOne is used to wait on a single event object, WaitAny is used to wait for any of a collection of event objects to become signaled, and WaitAll is used to wait for all event objects in a collection to become signaled.

You can utilize the batched thread execution model by associating each task in a batch with an event, queuing all the events for a batch, and then using the WaitHandle.WaitAll method to wait for all the tasks to complete. After the WaitAll method call returns, all events have been signaled, and the next batch can be queued.

Take particular care to avoid deadlocks. Deadlocks can occur in two main ways:

  • Using lock acquisition in a non-ordered and stagger manner
  • Inadvertently holding a lock because of an early exit from a method, often due to an exception

The first situation can best be illustrated by an example: If thread one acquires lock A at the same time that thread two acquires lock B, and then thread one attempts to acquire lock B while thread two attempts to acquire lock A, each thread will be locked waiting for the other. If neither thread uses a timeout in its attempted lock acquisition, the deadlock will continue until one of the threads is terminated, which typically won’t happen until the process terminates.

For the second case, in which the lock is inadvertently left unreleased, the simplest prevention technique is wrapping all the code that acquires and uses the lock in a __try-__finally block, with the release of the lock in the __finally block.

The following code sample is an example use of batched execution using the thread pool. In this case, tasks 1 and 2 can be run simultaneously. Once the WaitAll method call is complete, subsequent waves of tasks can be queued:

__gc class Batch{
   //tasks 1 and 2 to be batched together
   static void Task1(Object* stateInfo){
      __try{
         Console::WriteLine(S"Task1");
      }
      __finally{
         (__try_cast<ManualResetEvent __gc*>(stateInfo))->Set();
      }
   }

   static void Task2(Object* stateInfo){
      __try{
      Console::WriteLine(S"Task2");
   }
   __finally{
      (__try_cast<ManualResetEvent __gc*>(stateInfo))->Set();
   }
}

public:
//main entry point of class
static void UseThreadPool(){
   //holds handles for each task
   ArrayList __gc* handles = new ArrayList();

      //queue tasks 1 and 2
      ManualResetEvent __gc* event = new ManualResetEvent(false);
      handles->Add(event);
      ThreadPool::QueueUserWorkItem(new WaitCallback(0,
                                    Batch::Task1), event);

      event = new ManualResetEvent(false);
      handles->Add(event);
   ThreadPool::QueueUserWorkItem(new WaitCallback(0, Batch::Task2),
                                 event);

   //wait for both handles
   WaitHandle::WaitAll(__try_cast<WaitHandle __gc*[]>
   (handles->ToArray(__typeof(WaitHandle))));

   Console::WriteLine(S"Tasks 1 and 2 are finished");
   }
};

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read