Wednesday, April 1, 2015

Design ThreadPool


// Runnable is a base class for Threads
class Runnable
{
public:
Runnable(){ m_bExitRequested = false;}
virtual ~Runnable();
virtual void Run(){}
protected:
virtual bool ExitRequested(){return m_bExitRequested;}
bool m_bExitRequested;
};

// A thread.
class Thread : public Runnable
{
public:
Thread();
virtual ~Thread();
void CreateThread()
{
m_handle = _beginthread( ThreadStartFunction, 0, this );
if ( m_handle == -1 )
{
Error( "Unable to create thread." );
}
}
private:
unsigned long m_handle;
static void* ThreadStartFunction( void* pThis )
{
Thread* pThread = (Thread*)( pThis );
pThread->Run();
delete pThread;
}
};

// A thread that is added to a threadpool.
class PooledThread :
public Thread
{
public:
PooledThread( ThreadPool* pPool ):m_pPool( pPool ), m_pRunnable( NULL ){}
virtual void Run()
{
m_pPool->AddThreadToPool( this );
while ( !ExitRequested() )
{
ThreadPool::PooledRequest request = m_pPool->RemoveRequest();
if ( request.m_command == ThreadPool::PooledRequest::Exit )
break;
if ( request.m_command == ThreadPool::PooledRequest::NoOp )
continue;
if ( request.m_pRunnable != NULL )
request.m_pRunnable->Run();
}
m_pPool->RemoveThreadFromPool( this );
}
protected:
ThreadPool* m_pPool;
CCritSec m_crit;
};


class ThreadPool
{
public:
ThreadPool( int nMaxThreads = 1 ){ ResizeThreadPool(nMaxThreads);}
~ThreadPool();
void ResizeThreadPool( int nMaxThreads )
{
m_crit.Lock();
if ( m_nMaxThreads < nMaxThreads )
{
while ( m_nMaxThreads < nMaxThreads )
{
PooledThread* pThread = new PooledThread( this );
pThread->CreateThread( true );
m_nMaxThreads++;
}
}
else if ( m_nMaxThreads > nMaxThreads )
{
while ( m_nMaxThreads > nMaxThreads )
{
m_requests.push_front( PooledRequest( PooledRequest::Exit, NULL ) );
m_semRequests.Release();
m_nMaxThreads--;
}
}
m_crit.Unlock();
}
void AddRequest( Runnable* pRequest )
{
m_crit.Lock();
m_requests.push_back( PooledRequest( PooledRequest::Run, pRequest ) );
m_semRequests.Release();
m_crit.Unlock();
}
protected:
class PooledRequest
{
public:
enum Command {Exit, NoOp, Run };
Command m_command;
Runnable* m_pRunnable;
PooledRequest( Command command, Runnable* pRunnable ):m_command( command ), m_pRunnable( pRunnable ){}
PooledRequest(): m_command( NoOp ), m_pRunnable( NULL ){}
};
PooledRequest RemoveRequest()
{
m_semRequests.Wait();
m_crit.Lock();
PooledRequest request = m_requests.front();
m_requests.pop_front();
m_crit.Unlock();
return request;
}
virtual void AddThreadToPool( PooledThread* pThread )
{
m_crit.Lock();
m_threads.push_back( pThread );
m_crit.Unlock();
}
virtual void RemoveThreadFromPool( PooledThread* pThread )
{
m_crit.Lock();
m_threads.remove( pThread );
m_crit.Unlock();
}
protected:
CCritSec m_crit;
int m_nMaxThreads;
CSemaphore m_semRequests;
std::list<PooledRequest> m_requests;
std::list<PooledThread*> m_threads;
};

class CCritSec
{
public:
CCritSec(){ InitializeCriticalSection( &m_section ); }
~CCritSec(){ DeleteCriticalSection( &m_section ); }
void Lock() {EnterCriticalSection( &m_section ); }
void Unlock(){LeaveCriticalSection( &m_section ); }
 private:
CRITICAL_SECTION m_section;
};

No comments:

Post a Comment