Multi-Threaded Message Processor

Multi-threading is a powerful concept. Also, it is the most preferred mechanism for building highly concurrent and scalable applications. From desktop applications to high-end server based applications, all use multi-threading in the background. Most of the modern operating systems support multi-threading. This article presents an object-oriented framework that is very useful in building multi-threaded, disconnected applications.

This framework provides a programming model for developing multi-threaded applications that carry out a specific task using a threadpool. In this framework, a pool of threads gets messages from a message queue and processes the messages. The design allows for encapsulating the business logic for message processing in the thread-pool threads. This framework uses the Factory pattern to create thread-pool threads.

In a previous article on “OO wrapper to windows threads,” you created a wrapper class to windows threading API. If you haven’t already read this article, please read it before proceding. You will extend this class for use in the message processor framework.

Let me introduce you to the design of this framework first. The framework has the following classes. MQM (short name for Message Queue Manager), Thread (which encapsulates a windows threading API), MQThread (a thread class that can read from the message queue), MQThreadFactory (a factory class used to create MQThread derived class objects), and ThreadPoolManager (which creates, initializes, and uninitializes a pool of threads).

The user of the framework has to:

  • create a sub class of MQThread
  • implement the ProcessMessage function of the MQThread class
  • create a sub class of MQThreadFactory and
  • implement CreateNewMQThread
  • Create an object of ThreadPoolMgr
  • Register the MQThreadFactory object with the ThreadPoolMgr object (in the constructor)
  • Call the ThreadPoolMgr method CreateThreadPool
  • To shut down, call the Shutdown method of ThreadPoolMgr

The Thread class encapsulates the windows threading functionality. It allows external entities to create and manage a thread as though it’s an object. You can use this class to create a thread, suspend, resume, wait for the thread to finish its task, and so forth. The thread’s task is embedded in its Run function. When a thread is invoked, the ThreadEntry, Run, and ThreadExit (virtual) functions are called, in that order, to facilitate initialization, main task, and un-initialization in the implementing class. This class is described in detail in the previously mentioned article.

The MQM class is short for “Message Queue Manager.” It encapsulates Message queuing functionality in the MQPut, MQGet and GetMessageCount functions. MQM class functionality is to stores and retrieves messages. In this framework, the STL list is used to implement queue functionality. However, this class can be extended to encapsulate third-party message queues such as MSMQ/MQSeries and the like. Even a windows message queue can be wrapped with the MQM class to provide the same functionality.

class THREADPOOL_API MQM
{
   public:
      MQM(int nMsgQueueDepth = -1);
      virtual ~MQM(){}
      BOOL MQPut(void* pvData);
      BOOL MQGet(void*& pvData);

      long GetMessageCount();
      virtual int notify (void){return 1;}
   protected:
      LIST msgQ;
      int msgQDepth;
      HANDLE hLockFront;
      HANDLE hLockBehind;
      HANDLE hSem;
};

The MQM class definition is shown above. As it is seen, LIST is used to implement the queue. MQPut appends data to one end of the LIST and MQGet retrieves data from the other end of the LIST. The locks (mutexes) are represented by the hLockFront and hLockBehind handles. Another handle, hSem, is a handle to the semaphore that counts the number of messages inside the queue. The threads wait on this semaphore while retrieving the messages and the semaphore count is decremented every time a message is retrieved. On the other hand, each time a message is added to the queue using MQPut, the semaphore count is incremented using ReleaseSemaphore. The code for both MQGet and MQPut is shown below:

//put meesages into the queue
BOOL MQM::MQPut(void* pvData)
{
   MUTEXLOCK lock(hLockBehind);
   if((msgQDepth!= -1) && (msgQDepth < GetMessageCount()))
   {
      return FALSE;
   }
   if(pvData != NULL)
   {
      if(ReleaseSemaphore(hSem,1,NULL))
      {
         msgQ.push_back(pvData);
      }
   }
   notify();
   return TRUE;
}

//Get messages from the queue.
BOOL MQM::MQGet(void*& pvData)
{
   MUTEXLOCK lock(hLockFront);
   pvData = NULL;

   WaitForSingleObject(hSem,2000);
   if(0 < GetMessageCount())
   {
      pvData = (void*)msgQ.front();
      msgQ.pop_front();
   }
   else
   {
      pvData = NULL;
   }
   return TRUE;
}

The wait for the semaphore is restricted to two seconds because we do not want an infinite wait. The semaphore is decremented only if the wait is successful. In both methods, the MUTEXLOCK object is defined. This class is defined in the “ThreadPool.h” header file, like all other classes. This class serves the sole purpose of locking and unlocking on a mutex object in its constructor and destructor. This is a more convenient method than actually calling ReleaseMutex because there may be multiple return paths in a method and, if you forget to release a mutex, a deadlock situation will arise. This is a safer and surer way to make sure ReleaseMutex is called no matter where you return and even if an exception is thrown. You can see the code below.

class MUTEXLOCK
{
   HANDLE _h;

   public:

   MUTEXLOCK(HANDLE h)
   {
      _h = h;
      WaitForSingleObject(_h,INFINITE);
   }
   ~MUTEXLOCK()
   {
      ReleaseMutex(_h);
   }
};

Now, transcend to the MQThread class definition. This is a Thread subclass that is tailored to work with your MQM object. Thus, it has the following functionality.

class THREADPOOL_API MQThread : public Thread
{
   public:
      MQThread(){}
      virtual ~MQThread(){}
      void SetQueueManager(MQM *pQM);
      virtual void Run();
      virtual BOOL ProcessMessage(void* pvMessage)=0;

      BOOL PutMessage(void* pvData);
      BOOL GetMessage(void*& pvData);
   protected:
      MQM *pMQM;
};

It has a protected instance of MQM, implements the Run method, and defines an additional pure virtual function ProcessMessage. Also, there are two methods, PutMessage and GetMessage, that actually call MQPut and MQGet internally. Look at the Run method implimentation.

void MQThread::Run()
{
   void* msg = NULL;

   printf("Entering Run....Thread ID:%d\n",threadId);

   while(GetStopSignal())
   {
      if(GetMessage(msg) && (msg != NULL))
      {
         if(!ProcessMessage(msg))
         PutMessage(msg);
      }

   }

   printf("Exiting Run...Thread ID:%d\n",threadId);
}

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read