The K.I.S.S. Approach to I/O Completion Ports

Introduction

For building high-performance I/O-intensive applications, nothing beats I/O completion ports. I’ve read many articles and code samples for I/O completion ports, but it seemed like they were either too specific, or way too heavy for general purpose use. I’m a big fan of the KISS method (Keep It Simple Stupid—in case you’ve been living in isolation too long), so I decided to develop a few lightweight classes of my own.

Design Goals

The design goals were simple. First, I wanted to do this whole thing using a minimal number of classes. One thing I always hate about using someone else’s code is that invariably they have a lot of their own classes for every little thing even remotely related to the core technology they’re trying to demonstrate. A threadpool class, an event source base class, and a callback class will do nicely for us here. Any class wanting to use the threadpool and receive events on it would inherit the base event source class, and rather than using the OVERLAPPED struct, you’d use the callback class. The threadpool would be completely transparent, but also completely reusable for any kind of IOCP operation imaginable.

Additionally, I needed a few more capabilities. I needed to be able to cleanly dispose of any of these objects during a callback. I needed to be able to re-use a callback class from within a callback. Most importantly, this whole thing won’t work at all if we have to serialize access to things using mutexes or critical sections during every single IOCP callback operation. The last thing we want is our worker threads potentially blocking on each other during every completed I/O event.

The Players






CAsyncEventSource A base class to be inherited by all classes that want to perform any kind of asynchronous I/O on the threadpool. Its job is to keep track of how many pending I/O operations are queued up for an instance of this class, and provide a way to wait for all pending operations to complete (for a clean disposal) for this instance. It also has several static utility members to associate a handle with the threadpool, check whether the current thread is part of the pool, and so forth.
CAsyncCallback A wrapper around the OVERLAPPED struct for starters. This class provides an entry point for the thread pool to execute on, which in turn allows the class to callback into your class. It also helps manage the pending I/O count in CAsyncEventSource, and provides some functions to post events to the threadpool.
CThreadPool2<> A template class, shamelessly stolen and modified from ATL7’s CThreadPool template class. This one fixes bugs in ATL’s implementation and provides a little more robust functionality. (More on that below.)

The Issues

The first issue that came up was ATL7’s CThreadPool<> template class. The ThreadProc() function has a while() loop on GetQueuedCompletionStatus()‘s return value. OOPS! If any queued I/O event fails, or the function call itself failed, the thread will exit! When the time came to destruct the class, it would wait indefinitely on a thread that was no longer running. Next, I needed a new method to query whether or not the current active thread was one that belonged to the thread pool. So, I made these changes to the class and decided to extract it out of the headers and call it CThreadPool2. Although we include atlbase.h, you don’t have to be writing an ATL application to use this class. CThreadPool2 is the only class of the three that even remotely resembles ATL or depends on anything in atlbase.h.

When designing the CAsync classes, I had several issues to overcome. First, because CAsyncCallback replaces the OVERLAPPED struct, I now had the opportunity to create a robust callback mechanism. I needed to make sure that it was thread-safe, that a given instance of the class could be re-used from within a callback, and that it could be safely destroyed from within a callback. I also needed to make sure that it properly incremented/decremented the pending I/O operations for the CAsyncEventSource it was tied to, even if it was re-used or destroyed during a callback operation.

Next, the CAsyncEventSource has a WaitForPending() function to allow a parent class to wait for all overlapped operations to complete before destroying itself. However, this wait operation needed to be able to determine whether it was being called from within a callback. If it was being called from within a callback, the count of pending I/O operations would never reach zero (the operation won’t complete until this function exits, which never happens), and would cause an infinite wait. The answer, of course, is to detect whether the active thread is a thread belonging to the thread pool. If it is, we have to artificially decrease our refcount to accommodate the fact that the current overlapped operation will not be complete until this function returns. That, unfortunately, presents us with another problem. If this function returns and the parent class is destroyed immediately afterwards (during the callback), we could be in trouble. You see, the CAsyncCallback, once it finishes its callback function, has to decrement the pending I/O refcounts. If the CAsyncEventSource class is destroyed during the function call, CAsyncCallback‘s pointer to it becomes invalid.

TLS to the Rescue

So, we needed a way for CAsyncCallback to meet our destruction requirements (legal to destroy during a callback, legal to re-use during a callback, and legal for the CAsyncEventSource base and parent to be destroyed during a callback). The solution was simple. Thread local storage. CAsyncCallback will store a value in our TLS index before making the callback. If that value remains unchanged after the function returns, CAsyncCallback will assume the base CAsyncEventSource it is tied to still exists and will decrement the pending I/O refcount. CAsyncEventSource‘s destructor will (if called from a thread in the threadpool) reset this TLS index’s value, thereby causing CAsyncCallback NOT to decrement the refcount on return.

Keeping Track of Everything

CAsyncCallback‘s constructor requires a pointer to a CAsyncEventSource and an ASYNCRESULT callback function. These remain constant for the lifetime of the object. One of the main problems in doing asynchronous I/O is that it is possible to lose track of all the pending overlapped operations in the queue, so it is important to keep track of them all. Thus, before you may use an instance of CAsyncCallback, you must first call the Acquire() function on it. Acquire() does three things. First, it makes sure the instance hasn’t already been acquired by another thread; second, it locks itself so that no other thread can use it to make overlapped function calls; and third, it tells its underlying CAsyncEventSource that a new overlapped operation is about to begin.

After successfully Acquire()ing the object, you then issue your overlapped I/O request. If that request fails for any reason other than ERROR_IO_PENDING, you must release the class by calling the UnAcquire() method. If the overlapped operation begins or returns successfully, you do not have to worry about calling UnAcquire(); the CAsyncCallback function will perform those functions automatically during the callback operation.

Subscribing to the ThreadPool

CAsyncEventSource has several static utility members, Subscribe() being one of them. Once you have a HANDLE to something and want to associate it with the threadpool’s completion port, you’d simply pass the HANDLE to CAsyncEventSource::Subscribe(). There is no way to unassociate a handle with the IOCP once it has been associated; your only option for that is to close the handle.

Receiving Events

To receive an event, you first must declare a static entrypoint callback in your class. This entrypoint is where the completed overlapped operation will end up. It takes the following form:


static CALLBACK myfunc(CAsyncEventSource *pThis,
LPVOID Param1, LPVOID Param2, LPVOID Param3,
LPVOID Param4,
CAsyncCallback *pc)

Because your class inherits from CAsyncEventSource, you can cast pThis to a class pointer for your class and make a non-static function call into it from here. You also can take the opportunity to cast the various parameters to type-specific parameters while you’re at it.

Asynchronous I/O in Action

What follows is the beginning of a simple class declaration that uses overlapped I/O:


class CMyClass : public CAsyncEventSource
{
public:
CMyClass(void) : m_cbRead(this, _ReadEvent),
m_hFile(INVALID_HANDLE_VALUE) {}

protected:
CAsyncCallback m_cbRead;
HANDLE m_hFile;

virtual void OnRead(LPVOID pvBuffer, DWORD dwBytesRead,
BOOL bSuccess, CAsyncCallback *pc) = 0;

private:
static void CALLBACK _ReadEvent(CAsyncEventSource *pThis,
LPVOID pvParam1, LPVOID pvParam2,
LPVOID pvParam3, LPVOID pvParam4,
CAsyncCallback *pc)
{
CMyClass *p = static_cast<CMyClass *>(pThis);
DWORD dwBytes;
BOOL bSuccess;

bSuccess = GetOverlappedResult(p->m_hFile, pc, &dwBytes,
FALSE);
p->OnRead((LPVOID)pvParam1, dwBytes, bSuccess, pc);
}
<snip>

Somewhere in your class, you’d have to define m_hFile and Subscribe() to the threadpool. Afterwards, a function that makes an overlapped I/O read request would then look like this:


BOOL MyAsyncRead(LPVOID pvBuffer, DWORD dwBuffSize)
{
BOOL bRet = FALSE;

// Attempt to acquire the callback
if (m_cbRead.Acquire())
{
m_cbRead.m_Param1 = (LPVOID)pvBuffer;

// Make the overlapped read request
bRet = ReadFile(m_hFile, pvBuffer, dwBuffSize, NULL,
&m_cbRead);
if (bRet == FALSE)
{
// If we received an error other than pending, manually
// UnAcquire() the callback
if (GetLastError() == ERROR_IO_PENDING) bRet = TRUE;
else m_cbRead.UnAcquire();
}
}
return bRet;
}

The above is just an example. pvBuffer would have to survive the lifetime of the callback using the above function. The class can just as easily be modified to hold its own internal buffers.

The CSocketNT Class

Where would a framework be without an implementation? The source code from this article comes with CSocketNT, a class I wrote to take advantage of the overlapped I/O framework above. Using it is pretty straightforward, and it’s designed to be used as a base class.

CSocketNT is designed to be a thin wrapper only around the core overlapped I/O functions, namely ConnectEx, AcceptEx, WSARecv, WSARecvFrom, WSASend, and WSASendTo. In the case of ConnectEx, because it only appears in XP and later OSs, I implemented a cheap forgery that posts an overlapped event to the thread pool that services a blocking connect. The caveat, of course, is that for the duration of the blocking connect, that worker thread cannot process any IOCPs. There was no way I was about to clutter the implementation by supporting non-blocking WSAEventSelect and a corresponding set of waiting threads to handle event notifications.

Updated v1.02

  • 1.02 Small changes to CSocketNT making some of the Send(To)/Receive(From) parameters optional.
  • 1.01 Fixed classes so they compile cleanly at warning level 4.
  • 1.01 Fixed a bug in the CAsyncCallback mechanism that was sending the member parameters to the callback function rather than the local copy of the paramters. (If I’d have compiled at warning level 4 to begin with, it wouldn’t have happened!)
  • 1.01 Fixed a problem in CAsyncEventSource when multiple event sources are present. If an event source with no events outstanding were to be destroyed in a thread in the pool while other event sources were pending operations, it would artificially reduce its refcount and assert an error. Had to make sure that not only was it being destroyed by a thread in the pool, but it that it was THIS particular eventsource that had a pending overlapped operation on it.
  • 1.01 Added support for initializing COM in the threads of the pool if you #define THREADPOOL_USES_COM.

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read