Multi-Threaded Message Processor

Multi-threading is a powerful concept. Also, it is the most preferred mechanism for building highly concurrent and scalable applications. From desktop applications to high-end server based applications, all use multi-threading in the background. Most of the modern operating systems support multi-threading. This article presents an object-oriented framework that is very useful in building multi-threaded, disconnected applications.

This framework provides a programming model for developing multi-threaded applications that carry out a specific task using a threadpool. In this framework, a pool of threads gets messages from a message queue and processes the messages. The design allows for encapsulating the business logic for message processing in the thread-pool threads. This framework uses the Factory pattern to create thread-pool threads.

In a previous article on "OO wrapper to windows threads," you created a wrapper class to windows threading API. If you haven't already read this article, please read it before proceding. You will extend this class for use in the message processor framework.

Let me introduce you to the design of this framework first. The framework has the following classes. MQM (short name for Message Queue Manager), Thread (which encapsulates a windows threading API), MQThread (a thread class that can read from the message queue), MQThreadFactory (a factory class used to create MQThread derived class objects), and ThreadPoolManager (which creates, initializes, and uninitializes a pool of threads).

The user of the framework has to:

  • create a sub class of MQThread
  • implement the ProcessMessage function of the MQThread class
  • create a sub class of MQThreadFactory and
  • implement CreateNewMQThread
  • Create an object of ThreadPoolMgr
  • Register the MQThreadFactory object with the ThreadPoolMgr object (in the constructor)
  • Call the ThreadPoolMgr method CreateThreadPool
  • To shut down, call the Shutdown method of ThreadPoolMgr

The Thread class encapsulates the windows threading functionality. It allows external entities to create and manage a thread as though it's an object. You can use this class to create a thread, suspend, resume, wait for the thread to finish its task, and so forth. The thread's task is embedded in its Run function. When a thread is invoked, the ThreadEntry, Run, and ThreadExit (virtual) functions are called, in that order, to facilitate initialization, main task, and un-initialization in the implementing class. This class is described in detail in the previously mentioned article.

The MQM class is short for "Message Queue Manager." It encapsulates Message queuing functionality in the MQPut, MQGet and GetMessageCount functions. MQM class functionality is to stores and retrieves messages. In this framework, the STL list is used to implement queue functionality. However, this class can be extended to encapsulate third-party message queues such as MSMQ/MQSeries and the like. Even a windows message queue can be wrapped with the MQM class to provide the same functionality.

class THREADPOOL_API MQM
{
   public:
      MQM(int nMsgQueueDepth = -1);
      virtual ~MQM(){}
      BOOL MQPut(void* pvData);
      BOOL MQGet(void*& pvData);

      long GetMessageCount();
      virtual int notify (void){return 1;}
   protected:
      LIST msgQ;
      int msgQDepth;
      HANDLE hLockFront;
      HANDLE hLockBehind;
      HANDLE hSem;
};

The MQM class definition is shown above. As it is seen, LIST is used to implement the queue. MQPut appends data to one end of the LIST and MQGet retrieves data from the other end of the LIST. The locks (mutexes) are represented by the hLockFront and hLockBehind handles. Another handle, hSem, is a handle to the semaphore that counts the number of messages inside the queue. The threads wait on this semaphore while retrieving the messages and the semaphore count is decremented every time a message is retrieved. On the other hand, each time a message is added to the queue using MQPut, the semaphore count is incremented using ReleaseSemaphore. The code for both MQGet and MQPut is shown below:

//put meesages into the queue
BOOL MQM::MQPut(void* pvData)
{
   MUTEXLOCK lock(hLockBehind);
   if((msgQDepth!= -1) && (msgQDepth < GetMessageCount()))
   {
      return FALSE;
   }
   if(pvData != NULL)
   {
      if(ReleaseSemaphore(hSem,1,NULL))
      {
         msgQ.push_back(pvData);
      }
   }
   notify();
   return TRUE;
}

//Get messages from the queue.
BOOL MQM::MQGet(void*& pvData)
{
   MUTEXLOCK lock(hLockFront);
   pvData = NULL;

   WaitForSingleObject(hSem,2000);
   if(0 < GetMessageCount())
   {
      pvData = (void*)msgQ.front();
      msgQ.pop_front();
   }
   else
   {
      pvData = NULL;
   }
   return TRUE;
}

The wait for the semaphore is restricted to two seconds because we do not want an infinite wait. The semaphore is decremented only if the wait is successful. In both methods, the MUTEXLOCK object is defined. This class is defined in the "ThreadPool.h" header file, like all other classes. This class serves the sole purpose of locking and unlocking on a mutex object in its constructor and destructor. This is a more convenient method than actually calling ReleaseMutex because there may be multiple return paths in a method and, if you forget to release a mutex, a deadlock situation will arise. This is a safer and surer way to make sure ReleaseMutex is called no matter where you return and even if an exception is thrown. You can see the code below.

class MUTEXLOCK
{
   HANDLE _h;

   public:

   MUTEXLOCK(HANDLE h)
   {
      _h = h;
      WaitForSingleObject(_h,INFINITE);
   }
   ~MUTEXLOCK()
   {
      ReleaseMutex(_h);
   }
};

Now, transcend to the MQThread class definition. This is a Thread subclass that is tailored to work with your MQM object. Thus, it has the following functionality.

class THREADPOOL_API MQThread : public Thread
{
   public:
      MQThread(){}
      virtual ~MQThread(){}
      void SetQueueManager(MQM *pQM);
      virtual void Run();
      virtual BOOL ProcessMessage(void* pvMessage)=0;

      BOOL PutMessage(void* pvData);
      BOOL GetMessage(void*& pvData);
   protected:
      MQM *pMQM;
};

It has a protected instance of MQM, implements the Run method, and defines an additional pure virtual function ProcessMessage. Also, there are two methods, PutMessage and GetMessage, that actually call MQPut and MQGet internally. Look at the Run method implimentation.

void MQThread::Run()
{
   void* msg = NULL;

   printf("Entering Run....Thread ID:%d\n",threadId);

   while(GetStopSignal())
   {
      if(GetMessage(msg) && (msg != NULL))
      {
         if(!ProcessMessage(msg))
         PutMessage(msg);
      }

   }

   printf("Exiting Run...Thread ID:%d\n",threadId);
}

Multi-Threaded Message Processor

As you can see, the Run method continuously retrieves the messages from the message queue and gives it to ProcessMessage for processing. If the message wasn't processed successfully, it is put back into the queue. GetStopSignal is called in the Run method to retrieve the Stop/continue flag, which can be raised by an external entity to stop the execution of this thread. This gives Thread a chance to clean up before exiting. All the sub classes of MQThread will implement the ProcessMessage function.

Last, but not least, look at the ThreadPoolManager class. This class can be used to create and destroy (too harsh) the thread pool. Here is the class definition:

class THREADPOOL_API ThreadPoolMgr : public MQM
{
   public:
      ThreadPoolMgr(int noOfThreads, MQThreadFactory *ptf,
                    int noOfMessages = -1);
      virtual ~ThreadPoolMgr();

      BOOL CreateThreadPool();
      void Shutdown();

   protected:
      int nThreads;
      int nCurThreads;
      BOOL bThreadPoolCreated;
      LIST listThreads;
      MQThreadFactory *pthreadFactory;
};

It has a very elaborate constructor through which you initialize the number of threads, thread factory object, and the message queue depth. The message queue depth signifies the backlog in the message queue. For example, if the threads are processing at 10 messages per second, but 100 messages are being pumped into the message queue per second, soon, the message queue is going to grow to an unmanageable size. To prevent this, you limit the size of queue to, say 50 messages (at any given point of time). Then, additional MQPut requests will be turned down.

Now, take a look at the implementation of the CreateThreadPool function:

BOOL ThreadPoolMgr::CreateThreadPool()
{
   if(bThreadPoolCreated)
      return TRUE;
   for(int i=0; i<nThreads;i++)
   {
      MQThread* pThread = pthreadFactory->CreateNewMQThread();

      if(pThread != NULL)
      {
         listThreads.insert(listThreads.end(),(void*)pThread);
      }
      pThread->SetQueueManager((MQM*)this);
      pThread->CreateNewThread();
   }
   bThreadPoolCreated = TRUE;
   return TRUE;
}

Here, you create as many threads as initialized by using the MQThreadFactory object and adding each thead object in a list for future use. Assign the MQM subclass, which is the ThreadPoolMgr class object (this) to each thread and start the execution by calling the "CreateNewThread" function on the MQThread object.

The Shutdown method has a very simple job. It just sets a flag in each thread by calling the Stop method. Threads read this flag and exit the continuous loop that they are executing and return from the Run function. Upon reaching this the function, ThreadExit is called and the thread stops execution. The code is given below:

void ThreadPoolMgr::Shutdown()
{
   for(ITER iter = listThreads.begin();
      iter != listThreads.end(); iter++)
   {
      Thread *pt = (Thread*)(*iter);
      pt->Stop();
      pt->Wait();
      delete pt;
   }
   listThreads.erase(listThreads.begin(),listThreads.end());
   bThreadPoolCreated = FALSE;
}

You can understand this framework better by putting everything together. This framework manages a pool of threads that processes a specific type of messages (depending on the MQThread sub-class). The messages are stored in a message queue.

Often, a pool of threads is used to build applications. Having threads readily initialized and waiting to serve requests is better than creating threads on the fly. Creating threads on the fly also will pose a threat of not being able to manage their lifecycle, apart from the burden of system calls when creating them.

Lets take a look at how to use this framework. Lets create a new project called ThreadPoolImpl and Keep the usage of this framework very simple. First, lets create the required sub classes:

class MyMQThread : public MQThread
{
   virtual BOOL ProcessMessage(void* pvMessage)
   {
      printf("The message is:%s\nThread ID is:%d\n",
             (char*)pvMessage,threadId);
      return TRUE;
   }
};
class MyMQThreadFactory : public MQThreadFactory
{
   virtual MQThread* CreateNewMQThread()
   {
      return new MyMQThread();
   }
};

We shall extend MQThread and implement the pure virtual function "ProcessMessage" in the MyMQThread class. Because we intend to keep it very simple, we just print the message and the thread ID in this method. Then, we create a factory class called MyMQThreadFactory by extending MQThreadFactory. Here, we create an object of the above-mentioned thread class and return the object. For those who are uninitiated to the Abstract Factory pattern (GOF), I suggest the book Design Patterns by Erich Gamma et. al.

After this ground work, lets implement the main function as shown below.

MyMQThreadFactory *thFac = new MyMQThreadFactory();
ThreadPoolMgr *mgr = new ThreadPoolMgr(5, thFac, 50);
mgr->CreateThreadPool();

The preceding piece of code creates the thread factory object first. It then creates the thread pool manager object by initializing five threads in the pool and a factory object to create threads and the message queue depth of 50.

printf("sleeping...\n");
Sleep(5000);

Then, main thread sleeps for five seconds. This serves to demonstrate the behavior of Thread pool threads when no messages are available to process. Because the semaphore count is 0 (not signalled), threads keep waiting.

for(int i=0; i < 50; i++)
   {
      char *s = new char[10];
      itoa(i,s,10);
      printf("putting message : %s\n",s);
      mgr->MQPut(s);
   }

Now, we pump 50 messages into the message queue. During this action, the thread pool becomes active. Threads start printing the following message:

The message is:1 Thread ID is: 5126 ... The message is:44 Thread ID is:2108

The next piece of code will just wait for five seconds before shutting down the thread pool.

Sleep(5000);
printf("Shutting down...\n");
mgr->Shutdown();

Real-world applications use better mechanisms than this one to determine when to shut down the thread pool. However, when you see the following messages on the screen; they indicate thread pool shutdown.

Shutting down...
Exiting Run...Thread ID:2584
Exiting Run...Thread ID:2260
Exiting Run...Thread ID:2108
Exiting Run...Thread ID:3980
Exiting Run...Thread ID:3892

Thus, we have concluded with the exercise of using the multi-threaded message processor framework in an application.

Finally, a few points about creating frameworks. It is a very good idea to create frameworks when just writing a piece of code that works is sufficient for the situation. This is because Frameworks are reusable and flexible. A good framework is specific for a job but provides extensibility at critical points. Typically, it is a best practice for an organization to have a framework repository and try to use the existing frameworks when architecting a new solution.

Along with this article, I have provided complete source code in the form of VC++ 6.0 projects ThreadPool and ThreadPoolImpl. The first project compiles to a DLL. This DLL is used by the second project, so please make sure to provide the appropriate header file and library paths when compiling and making the ThreadPool.dll accessible (PATH) before executing ThreadPoolImpl.exe.



About the Author

Sathya Narayana Panduranga

I am currently working with Flextronics Software Systems at Bangalore, India as a Technical Architect.

Downloads

Comments

  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • With JRebel, developers get to see their code changes immediately, fine-tune their code with incremental changes, debug, explore and deploy their code with ease (both locally and remotely), and ultimately spend more time coding instead of waiting for the dreaded application redeploy to finish. Every time a developer tests a code change it takes minutes to build and deploy the application. JRebel keeps the app server running at all times, so testing is instantaneous and interactive.

  • When it comes to desktops – physical or virtual – it's all about the applications. Cloud-hosted virtual desktops are growing fast because you get local data center-class security and 24x7 access with the complete personalization and flexibility of your own desktop. Organizations make five common mistakes when it comes to planning and implementing their application management strategy. This eBook tells you what they are and how to avoid them, and offers real-life case studies on customers who didn't …

Most Popular Programming Stories

More for Developers

Latest Developer Headlines

RSS Feeds