菜鸟笔记
提升您的技术认知

可伸缩多线程任务队列

  在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

  主要有以下几个功能:

    1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

    2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

    3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

    4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

  大体框架主要由3个类构成

    1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

    2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

    3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

  类图如下:

  该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

  Job.h文件:

class CJob
{
public:
    CJob();
    virtual ~CJob();
    
    BOOL m_Completed;         //任务是否完成:TRUE 完成,FALSE 未完成
    static long lastUsedID;   //最后的ID
    
    //================================================================================================
    //函数名:                  setPriority
    //函数描述:                设置任务优先级
    //输入:                    [in] priority 优先级别
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setPriority(int priority);

    //================================================================================================
    //函数名:                  getPriority
    //函数描述:                返回任务优先级
    //输入:                    无
    //输出:                    无
    //返回:                    任务优先级
    //================================================================================================
    int getPriority();
    
    //================================================================================================
    //函数名:                  getID
    //函数描述:                返回任务ID
    //输入:                    无
    //输出:                    无
    //返回:                    任务ID
    //================================================================================================
    long getID();
    
    //================================================================================================
    //函数名:                  setAutoDelete
    //函数描述:                设置完成任务后是否删除任务
    //输入:                    [in] autoDeleteFlag
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setAutoDelete(BOOL autoDeleteFlag = TRUE);

    //================================================================================================
    //函数名:                  AutoDelete
    //函数描述:                返回删除任务标记
    //输入:                    无
    //输出:                    无
    //返回:                    任务标记
    //================================================================================================
    BOOL AutoDelete();

    //================================================================================================
    //函数名:                  execute
    //函数描述:                任务真正工作的函数,纯虚函数,需要子类化实现
    //输入:                    无
    //输出:                    无
    //返回:                    任务ID
    //================================================================================================
    virtual void execute() = 0;    
private:
    long m_ID;               //任务ID
    BOOL m_autoDeleteFlag;   //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE
    int m_priority;          //任务优先级,默认为5

};

  Job.cpp文件:

long CJob::lastUsedID = 0;

CJob::CJob()
{
    this->m_ID = InterlockedIncrement(&lastUsedID);
    this->m_autoDeleteFlag = TRUE;
    this->m_priority = 5;
    this->m_Completed= FALSE;
}

CJob::~CJob()
{
}

BOOL CJob::AutoDelete()
{
    return m_autoDeleteFlag;
}

void CJob::setAutoDelete(BOOL autoDeleteFlag)
{
    m_autoDeleteFlag = autoDeleteFlag;
}

long CJob::getID()
{
    return this->m_ID;
}

int CJob::getPriority()
{
    return this->m_priority;    
}

void CJob::setPriority(int priority)
{
    this->m_priority = priority;
}

  JobExecuter.h文件:

//一个对象对应一个线程,执行任务Job
class CJobExecuter
{
public:
    CJobExecuter(CMThreadedJobQ *pJobQ);
    virtual ~CJobExecuter();
    
    //================================================================================================
    //函数名:                  stop
    //函数描述:                停止执行任务
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void stop();
    
    //================================================================================================
    //函数名:                  execute
    //函数描述:                执行一个任务
    //输入:                    [in] pJob 任务指针
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void execute(CJob* pJob);
    
    static UINT ThreadFunction(LPVOID pParam); //线程函数
    
    CMThreadedJobQ* m_pJobQ;                   //指向线程任务队列指针
    CJob* m_pJob2Do;                           //指向正在执行任务的指针
    int m_flag;                                //线程执行标记
    CWinThread* m_pExecuterThread;             //线程标识符
};

  JobExecuter.cpp文件:

#define STOP_WORKING -1
#define KEEP_WORKING  0

CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
{
    this->m_pJobQ= pJobQ;
    this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
    this->m_pJob2Do = NULL;
    this->m_flag = KEEP_WORKING;
}

CJobExecuter::~CJobExecuter()
{
    if(this->m_pExecuterThread!= NULL )    
    {
        this->m_pExecuterThread->ExitInstance();
        delete m_pExecuterThread;    
    }
}

UINT CJobExecuter::ThreadFunction(LPVOID pParam)
{    
    CJobExecuter *pExecuter = (CJobExecuter *)pParam;
    pExecuter->m_flag = 1;
    ::Sleep(1);
    CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
    while(pExecuter->m_flag !=STOP_WORKING )
    {
        if(pExecuter->m_pJob2Do!=  NULL)
        {
            pExecuter->m_pJob2Do->execute();
            pExecuter->m_pJob2Do->m_Completed = TRUE;    
            if(pExecuter->m_pJob2Do->AutoDelete())
                delete pExecuter->m_pJob2Do;
            pExecuter->m_pJob2Do = NULL;
        }

        if(pExecuter->m_pJobQ == NULL) break;
        
        CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
        singleLock.Lock();
        if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁
        {
            pExecuter->stop();    
            singleLock.Unlock();    
        }
        else
        {
            pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);      //完成任务后,添加到CMThreadedJobQ的空闲队列中
            singleLock.Unlock();    
            pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();        
            pExecuter->m_pExecuterThread->SuspendThread();        
        }                
    }
    
    if(pExecuter->m_pJobQ != NULL)
    {
        pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
    }
    else
    {
        delete pExecuter;
    }

    return 0;
}

void CJobExecuter::execute(CJob* pJob)
{
    this->m_pJob2Do = pJob;
    ::Sleep(0);
    this->m_pExecuterThread->ResumeThread();
}

void CJobExecuter::stop()
{
    this->m_flag = STOP_WORKING;
    this->m_pExecuterThread->ResumeThread();
}

  MThreadedJobQ.h文件:

typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;

//线程池任务队列
class CMThreadedJobQ
{

public:
    typedef struct THNODE
    {
        CJobExecuter* pExecuter;
        THNODE * pNext ;
    } THNODE;
    
    CMThreadedJobQ();
    virtual ~CMThreadedJobQ();

    //================================================================================================
    //函数名:                  deleteJobExecuter
    //函数描述:                删除一个JobExecuter对象
    //输入:                    [in] pEx
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void deleteJobExecuter(CJobExecuter *pEx);
    
    //================================================================================================
    //函数名:                  setMaxNoOfExecuter
    //函数描述:                设置CJobExecuter的个数
    //输入:                    [in] value
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setMaxNoOfExecuter(int value);

    //================================================================================================
    //函数名:                  addJobExecuter
    //函数描述:                添加一个CJobExecuter
    //输入:                    [in] pEx
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addJobExecuter(CJobExecuter *pEx);
    
    //================================================================================================
    //函数名:                  getJobExecuter
    //函数描述:                返回一个CJobExecuter
    //输入:                    无
    //输出:                    无
    //返回:                    处理任务的指针
    //================================================================================================
    CJobExecuter* getJobExecuter();

    //================================================================================================
    //函数名:                  addFreeJobExecuter
    //函数描述:                添加一个CJobExecuter
    //输入:                    [in] pEx
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addFreeJobExecuter(CJobExecuter *pEx);

    //================================================================================================
    //函数名:                  addJob
    //函数描述:                添加一个任务
    //输入:                    [in] pJob
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addJob(CJob *pJob);
    
    //================================================================================================
    //函数名:                  getMaxNoOfExecuter
    //函数描述:                获取CJobExecuter个数的最大值
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    int getMaxNoOfExecuter();
    
    //================================================================================================
    //函数名:                  getNoOfExecuter
    //函数描述:                获取当前CJobExecuter的个数
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    int getNoOfExecuter();

    static UINT JobObserverThreadFunction(LPVOID);

    //================================================================================================
    //函数名:                  pause
    //函数描述:                挂起JobObserverThread线程
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void pause();

    //================================================================================================
    //函数名:                  resume
    //函数描述:                唤醒JobObserverThread线程
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void resume();    
        
    CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程
    CCriticalSection m_cs;         //关键代码段,用于互斥
    CJobQList m_jobQList;          //任务队列
private :
    BOOL m_pause;                  //JobObserverThread线程运行标记
    int m_MaxNoOfExecuter;         //CJobExecuter最大个数
    int m_NoOfExecuter;            //当前CJobExecuter个数
    THNODE* m_pFreeEList;          //维护空闲处理任务线程的队列
    THNODE* m_pAllEList;           //维护所有处理任务线程的队列
};

  MThreadedJobQ.cpp文件:

CMThreadedJobQ::CMThreadedJobQ()
{
    m_MaxNoOfExecuter = 2;
    m_pause = FALSE;
    m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
    m_pFreeEList =NULL;
    m_NoOfExecuter =0;
    m_pAllEList = NULL;
}

CMThreadedJobQ::~CMThreadedJobQ()
{
    THNODE* pTempNode;
    while (m_pAllEList != NULL) 
    {    
        pTempNode = m_pAllEList->pNext;
        delete m_pAllEList->pExecuter;        
        delete m_pAllEList;        
        m_pAllEList = pTempNode;    
    }    

    while (m_pFreeEList != NULL) 
    {    pTempNode = m_pFreeEList->pNext;        
        delete m_pFreeEList;        
        m_pFreeEList = pTempNode;    
    }    

    m_pObserverThread->ExitInstance();    
    delete m_pObserverThread;
}


void CMThreadedJobQ::pause()
{
    this->m_pause = TRUE;
}

void CMThreadedJobQ::resume()
{
    this->m_pause = FALSE;
    this->m_pObserverThread->ResumeThread();
}

UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
{
    CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
    CJobExecuter *pJExecuter;

    while(TRUE)
    {
        Sleep(100);
        if(pMTJQ->m_pause != TRUE)
        {
            while(!pMTJQ->m_jobQList.IsEmpty() )
            {
                pJExecuter = pMTJQ->getJobExecuter();
                if( pJExecuter!=NULL)
                {                
                    pMTJQ->m_cs.Lock();
                    pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
                    pMTJQ->m_jobQList.RemoveHead();
                    AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
                    pMTJQ->m_cs.Unlock();
                }
                else
                {
                    break;
                }
                if(pMTJQ->m_pause == TRUE)
                    break;
            }
        }
        pMTJQ->m_pObserverThread->SuspendThread();
    }
    return 0;
}

int CMThreadedJobQ::getNoOfExecuter()
{
    return this->m_NoOfExecuter;
}

int CMThreadedJobQ::getMaxNoOfExecuter()
{
    return this->m_MaxNoOfExecuter;
}

void CMThreadedJobQ::addJob(CJob *pJob)
{
    CJob * pTempJob;
    CSingleLock sLock(&this->m_cs);
    sLock.Lock();    
    POSITION pos,lastPos;
    pos = this->m_jobQList.GetHeadPosition();    
    lastPos = pos;
    if(pos != NULL)
        pTempJob =this->m_jobQList.GetHead();
    while(pos != NULL )
    {        
        if( pJob->getPriority() > pTempJob->getPriority())
            break;
        lastPos = pos;
        pTempJob =     this->m_jobQList.GetNext(pos);        
    }    
    if(pos == NULL)    
        this->m_jobQList.AddTail(pJob);
    else
        this->m_jobQList.InsertBefore(lastPos,pJob);
    this->m_pObserverThread->ResumeThread();
    sLock.Unlock();
}

void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
{
    m_cs.Lock();
    THNODE* node = new THNODE;
    node->pExecuter = pEx;
    node->pNext = this->m_pFreeEList;
    this->m_pFreeEList = node;
    m_cs.Unlock();
}

CJobExecuter* CMThreadedJobQ::getJobExecuter()
{
    THNODE *pTemp;
    CJobExecuter *pEx=NULL;
    m_cs.Lock();

    if(this->m_pFreeEList != NULL)  //有空闲CJobExecuter,就返回
    {
        pTemp = this->m_pFreeEList;
        this->m_pFreeEList = this->m_pFreeEList->pNext;
        pEx = pTemp->pExecuter;
        delete pTemp ;
        m_cs.Unlock();
        return pEx;
    }

    if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter
    {
        pEx =  new CJobExecuter(this);
        this->addJobExecuter(pEx);
        this->m_NoOfExecuter++;
        m_cs.Unlock();
        return pEx;
    }
    m_cs.Unlock();
    return NULL;
}

void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
{
    m_cs.Lock();
    THNODE* node = new THNODE;
    node->pExecuter= pEx;
    node->pNext = this->m_pAllEList;
    this->m_pAllEList = node;
    m_cs.Unlock();
}

void CMThreadedJobQ::setMaxNoOfExecuter(int value)
{
    this->m_cs.Lock();
    if(value >1 && value <11)
        this->m_MaxNoOfExecuter = value;
    m_pObserverThread->ResumeThread();
    this->m_cs.Unlock();
}

void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
{
    THNODE* pNode,*pNodeP;
    CSingleLock singleLock(&m_cs);    
    singleLock.Lock();    
    if(this->m_pAllEList != NULL)
    {
        pNode = this->m_pAllEList;
        if(pNode->pExecuter == pEx )    
        {
          this->m_pAllEList = pNode->pNext;
          delete pNode;          
        }
        else
        {
            pNodeP =pNode;
            pNode  = pNode->pNext ;            
            while(pNode != NULL )
            {
                if(pNode->pExecuter== pEx ) break;
                pNodeP = pNode;
                pNode  = pNode->pNext ;            
            }
            if(pNode!= NULL)
            {
                pNodeP->pNext = pNode->pNext;
                delete pNode;
            }
        }
    }
    this->m_NoOfExecuter--;
    singleLock.Unlock();
    pEx->stop();
    Sleep(1);
    delete pEx;
}

  以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。