Tuesday, September 2, 2014

ThreadPool implementation using IO Completion Ports (Visual C++)

Looking for an efficient, fast and custom thread pool using Visual C++? Then implement a custom thread pool using IO-Completion ports.

Overview

There are two ways to manage a pool of threads.

One is, to depend on Microsoft’s straight forward way of thread pool implementation using Win32-API’s such as ‘QueueUserWorkItem’. Though simple, it lacks on the performance side. This is due to the fact that, frequent thread context switching can happen, while selecting worker threads from the pool to process requests. Also if we’ve a multi-core CPU, all cores may not be used in this implementation. Typically for inter-thread communications, we might be using ‘window messages’ that might also be slow.

The second way is to build your own thread pool using ‘I/O Completion ports (IOCP)’ API’s. It is known to be an extremely fast and efficient solution for custom thread pooling, that can far outweigh the rudimentary thread pooling described above. At the very heart, IOCP implementation is revolving around the usage of the below 3 Win32-API’s.

CreateIoCompletionPort

This API will create the IO queue, to which work requests will be posted and retrieved. In a thread pool scenario, work requests will be submitted and retrieved to/from this queue and worker thread will process the requests.

PostQueuedCompletionStatus

This API is used to post a work request to the queue, by any client to the thread pool.

GetQueuedCompletionStatus

Worker threads, will call and wait on this API, to retrieve work request from the queue. This API, will not add any overhead to the CPU, like the conventional way of waiting on handles. 

Advantages

The following are the advantages of using ‘IO-Completion ports’ for thread pool implementations.

1. Maximum CPU core usage

If you haven’t specified otherwise, IOCP will use the maximum number of CPU cores.

2. No unwanted context switching

The threads are used in LIFO (Last in first out) manner, hence the last thread in memory will get the priority to process the next work item and hence, no context switches will happen to load another thread to memory.

3. No clogging up CPU, for polling or waiting

IOCP is more efficient than, the conventional way of waiting on events, polling or messages.

4. Make a configured number of pre-created worker threads to be available in thread pool

While setting up the thread pool, we can create a configured number of threads and make them readily available for servicing the requests as soon as, we starts the thread pool.

That being said, this is the most recommended way of doing multithreading on a server scenario, as the server can have a very high loads of requests to be processed by a pool of threads. Also IOCP is not only for a custom thread pool implementation, it can be used with any object, that can work with overlapped IO and Asynchronous IO events to get the maximum performance and to use the least amount of server resources. Typical examples are WSA sockets, file handles e.t.c.

You can get a good understanding on IOCP by following articles in msdn (Threadpool, IOCP in msdn).

Demo

Ok that’s about the theory. Let’s go ahead and create a custom thread pool using IOCP. For this exercise, we will be using Visual C++ with MFC (Microsoft Foundation Classes). Below depicts the high level view on the implementation.

a. Create the IO-Completion port for the thread pool (Using API CreateIoCompletionPort)

b. Create pre-defined number of worker threads, that is to be readily available to the pool

c. For each ‘work request’ from client:

Queue work request to IOCP using API ‘PostQueuedCompletionStatus

d. Each worker thread will be waiting for queued jobs

using API ‘GetQueuedCompletionStatus

If any jobs available (eg: Step#c got invoked by a client), one worker thread will awaken to process the request

e. Once we’ve done with the thread pool, stop it:

Make all worker threads exit (using ‘PostQueuedCompletionStatus’, with a different parameter)

A work request constitutes of two arguments.

Work Request:

Data to be processed

This is a void long pointer, that can point to any structure or object. Using LPVOID, we can support any objects as the Data and it ensures scalability.

Function to act up on the Data

Client has the option to attach a pointer to ‘C++ class member function’ or a ‘Global C function’, that needs to be invoked by the worker thread to process the supplied data.

Demo – UI walkthrough

Below shows the UI of the test application.

image

You will start, by providing the desired number ‘Number of Worker Threads’ in the very first text box and hitting on the ‘Start Thread Pool Manager’ button. This will initialize the thread pool and creates the worker threads on the startup.

Now you can push as many job as you wish, by providing the ‘Jobs To Push’ as a desired integer value (Number of jobs to push) and hitting either ‘Push Jobs to Job Queue Using Member Function’ or ‘Push Jobs to Job Queue using Global Function’. The former button is to use ‘C++ class member function’ to process the data or the latter is to use a ‘Global C Function’.

These functions will simply write some text (given to it as its Data) to a file to make them simple enough.

Once you experiment enough, don’t forget to hit ‘Stop Thread Pool Manager’ button to gracefully shutdown the thread pool. It will remove any pending jobs in the queue and wait for any pending processing to finish.

Demo – Code Walkthrough

Below are the type definitions for the ‘Global C’ and ‘C++ class member’ prototype functions. Please note that, if you need to attach a ‘C++ class member function’, the class containing the member function should be derived from the very base class of MFC, the ‘CObject’.

// Function prototype of the worker function to be supported at the client side
// [For Non Member functions, i.e. For static or global functions
typedef void ( WINAPI   *WORKER_CALLBACK_PROTOTYPE )( LPVOID pDataIn_i );


// Function prototype of the worker function to be supported at the client side
// [For Class Member Functions that should be invoked using an object]

typedef void ( CObject::*MEMBER_FUNCTION_WORKER_CALLBACK_PROTOTYPE )( LPVOID pDataIn_i );

The below listing will give you a summary on the codebase. Please note that, in actual project source attached, they are scattered around various places like inside button clicks and MFC dialog classes. But for simplicity we’ve consolidated all such code into a single place here.

The header file contains the thread pool manager object. The implementation file contains the ‘Global C Function’ and ‘C++ member function’ to be invoked by the worker threads in the thread pool. ‘InitializeThreadPool’ should be called first to initialize the thread pool. Then jobs are pushed the thread pool using the ‘EnQueue’ method. Supporting macros are being used to attach the functions to the structure members. ‘pJobInput’ is the member, that will contain the ‘Data’ to be processed. In our case its simply the for loop index. Finally ‘StopThreadPool’ is being called to shutdown the threadpool gracefully.

Header File (.h)

CThreadPoolEx m_ThreadPoolManager;

void MemberFunctionWorkerProc( LPVOID pDataIn_i );

Implementation File (.cpp)

void GlobalWorkerProc( LPVOID pDataIn_i )
{…}

// Worker function [ A global function ]that will call backed by the thread pool manager
void CThreadPoolManagerDlg::MemberFunctionWorkerProc( LPVOID pDataIn_i )
{…}

// Initialize the thread pool first
if( !m_ThreadPoolManager.InitializeThreadPool( m_nWorkerThreadCount ))
{…}

void CThreadPoolManagerDlg::OnButtonStartJobs()
{

    // Create the job item and set the worker and call back functon
    // Which is same for all the worker procedure
    THREAD_POOL_JOB_ITEM JobItem;

    CREATE_MEMBER_FUNCTION_CALLBACK( CThreadPoolManagerDlg::MemberFunctionWorkerProc, JobItem );

    bool bNotInitialized = false;
    int nIndex;
    for( nIndex = 1; nIndex <= m_nJobsToPush; ++nIndex )
    {
        // Add the job input to the job item
        JobItem.pJobInput = (LPVOID)nIndex;      
        // Add the job item to the job queue
        m_ThreadPoolManager.EnQueue( JobItem );
    }
}

void CThreadPoolManagerDlg::OnButtonStartJobsNonmember()
{
    // Create the job item and set the worker and call back functon
    // Which is same for all the worker procedure
    THREAD_POOL_JOB_ITEM JobItem;

    CREATE_FUNCTION_CALLBACK( GlobalWorkerProc, JobItem );

    bool bNotInitialized = false;
    int nIndex;
    for( nIndex = 1; nIndex <= m_nJobsToPush; ++nIndex )
    {
        // Add the job input to the job item
        JobItem.pJobInput = (LPVOID)nIndex;      
        // Add the job item to the job queue
        m_ThreadPoolManager.EnQueue( JobItem );
    }

    return;
}

void CThreadPoolManagerDlg::OnButtonStopThreadPoolMgr()
{
    // Stop the thread pool gracefully
    m_ThreadPoolManager.StopThreadPool();
}

Demo – Code Download

Download the complete source code from here.

Note: We’ve provided two version of thread pool classes. ‘CThreadPool’ and ‘CThreadPoolEx’. In the former version, we are creating all the worker thread at once during the startup of the thread pool. So a pre defined number of threads will always be available during startup. In the latter version, a single worker thread will be created during the startup of the thread pool. Later, as requests get queued and there is no free worker thread available to process the request, additional worker threads will be created on the fly.