一个Windows下线程池的实现(C++)

2018-02-20 19:38:21来源:cnblogs.com作者:TanGuoying人点击

分享

前言

  本文配套代码:https://github.com/TTGuoying/ThreadPool

  先看看几个概念:

  1.  线程:进程中负责执行的执行单元。一个进程中至少有一个线程。
  2.  多线程:一个进程中有多个线程同时运行,根据cpu切换轮流工作,在多核cpu上可以几个线程同时在不同的核心上同时运行。
  3.  线程池:基本思想还是一种对象池思想,开辟一块内存空间,里面存放一些休眠(挂起Suspend)的线程。当有任务要执行时,从池中取一个空闲的线程执行任务,执行完成后线程休眠放回池中。这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。

  我们为什么要使用线程池呢?

  简单来说就是线程本身存在开销,我们利用多线程来进行任务处理,单线程也不能滥用,无止禁的开新线程会给系统产生大量消耗,而线程本来就是可重用的资源,不需要每次使用时都进行初始化,因此可以采用有限的线程个数处理无限的任务。

代码实现

  本文的线程池是在Windows上实现的。主要思路是维护一个空闲线程队列、一个忙碌线程队列和一个任务队列,一开始建立一定数量的空闲线程放进空闲线程队列,当有任务进入任务队列时,从空闲线程队列中去一个线程执行任务,线程变为忙碌线程移入忙碌线程队列,任务执行完成后,线程到任务队列中取任务继续执行,如果任务队列中没有任务线程休眠后从忙碌线程队列回到空闲线程队列。下面是线程池的工作原理图:

  本线程池类实现了自动调节池中线程数。

  废话少说,直接上代码:

  1 /*  2 ==========================================================================  3 * 类ThreadPool是本代码的核心类,类中自动维护线程池的创建和任务队列的派送  4   5 * 其中的TaskFun是任务函数  6 * 其中的TaskCallbackFun是回调函数  7   8 *用法:定义一个ThreadPool变量,TaskFun函数和TaskCallbackFun回调函数,然后调用ThreadPool的QueueTaskItem()函数即可  9  10 Author: TTGuoying 11  12 Date: 2018/02/19 23:15 13  14 ========================================================================== 15 */ 16 #pragma once 17 #include <Windows.h> 18 #include <list> 19 #include <queue> 20 #include <memory> 21  22 using std::list; 23 using std::queue; 24 using std::shared_ptr; 25  26 #define THRESHOLE_OF_WAIT_TASK  20 27  28 typedef int(*TaskFun)(PVOID param);                // 任务函数 29 typedef void(*TaskCallbackFun)(int result);        // 回调函数 30  31 class ThreadPool 32 { 33 private: 34     // 线程类(内部类) 35     class Thread 36     { 37     public: 38         Thread(ThreadPool *threadPool); 39         ~Thread(); 40  41         BOOL isBusy();                                                    // 是否有任务在执行 42         void ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback);    // 执行任务 43  44     private: 45         ThreadPool *threadPool;                                            // 所属线程池 46         BOOL    busy;                                                    // 是否有任务在执行 47         BOOL    exit;                                                    // 是否退出 48         HANDLE  thread;                                                    // 线程句柄 49         TaskFun    task;                                                    // 要执行的任务 50         PVOID    param;                                                    // 任务参数 51         TaskCallbackFun taskCb;                                            // 回调的任务 52         static unsigned int __stdcall ThreadProc(PVOID pM);                // 线程函数 53     }; 54  55     // IOCP的通知种类 56     enum WAIT_OPERATION_TYPE 57     { 58         GET_TASK, 59         EXIT 60     }; 61  62     // 待执行的任务类 63     class WaitTask 64     { 65     public: 66         WaitTask(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL bLong) 67         { 68             this->task = task; 69             this->param = param; 70             this->taskCb = taskCb; 71             this->bLong = bLong; 72         } 73         ~WaitTask() { task = NULL; taskCb = NULL; bLong = FALSE; param = NULL; } 74  75         TaskFun    task;                    // 要执行的任务 76         PVOID param;                    // 任务参数 77         TaskCallbackFun taskCb;            // 回调的任务 78         BOOL bLong;                        // 是否时长任务 79     }; 80  81     // 从任务列表取任务的线程函数 82     static unsigned int __stdcall GetTaskThreadProc(PVOID pM) 83     { 84         ThreadPool *threadPool = (ThreadPool *)pM; 85         BOOL bRet = FALSE; 86         DWORD dwBytes = 0; 87         WAIT_OPERATION_TYPE opType; 88         OVERLAPPED *ol; 89         while (WAIT_OBJECT_0 != WaitForSingleObject(threadPool->stopEvent, 0)) 90         { 91             BOOL bRet = GetQueuedCompletionStatus(threadPool->completionPort, &dwBytes, (PULONG_PTR)&opType, &ol, INFINITE); 92             // 收到退出标志 93             if (EXIT == (DWORD)opType) 94             { 95                 break; 96             } 97             else if (GET_TASK == (DWORD)opType) 98             { 99                 threadPool->GetTaskExcute();100             }101         }102         return 0;103     }104 105     //线程临界区锁106     class CriticalSectionLock107     {108     private:109         CRITICAL_SECTION cs;//临界区110     public:111         CriticalSectionLock() { InitializeCriticalSection(&cs); }112         ~CriticalSectionLock() { DeleteCriticalSection(&cs); }113         void Lock() { EnterCriticalSection(&cs); }114         void UnLock() { LeaveCriticalSection(&cs); }115     };116 117 118 public:119     ThreadPool(size_t minNumOfThread = 2, size_t maxNumOfThread = 10);120     ~ThreadPool();121 122     BOOL QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb = NULL, BOOL longFun = FALSE);       // 任务入队123 124 private:125     size_t getCurNumOfThread() { return getIdleThreadNum() + getBusyThreadNum(); }    // 获取线程池中的当前线程数126     size_t GetMaxNumOfThread() { return maxNumOfThread - numOfLongFun; }            // 获取线程池中的最大线程数127     void SetMaxNumOfThread(size_t size)            // 设置线程池中的最大线程数128     { 129         if (size < numOfLongFun)130         {131             maxNumOfThread = size + numOfLongFun;132         }133         else134             maxNumOfThread = size; 135     }                    136     size_t GetMinNumOfThread() { return minNumOfThread; }                            // 获取线程池中的最小线程数137     void SetMinNumOfThread(size_t size) { minNumOfThread = size; }                    // 设置线程池中的最小线程数138 139     size_t getIdleThreadNum() { return idleThreadList.size(); }    // 获取线程池中的线程数140     size_t getBusyThreadNum() { return busyThreadList.size(); }    // 获取线程池中的线程数141     void CreateIdleThread(size_t size);                            // 创建空闲线程142     void DeleteIdleThread(size_t size);                            // 删除空闲线程143     Thread *GetIdleThread();                                    // 获取空闲线程144     void MoveBusyThreadToIdleList(Thread *busyThread);            // 忙碌线程加入空闲列表145     void MoveThreadToBusyList(Thread *thread);                    // 线程加入忙碌列表146     void GetTaskExcute();                                        // 从任务队列中取任务执行147     WaitTask *GetTask();                                        // 从任务队列中取任务148 149     CriticalSectionLock idleThreadLock;                            // 空闲线程列表锁150     list<Thread *> idleThreadList;                                // 空闲线程列表151     CriticalSectionLock busyThreadLock;                            // 忙碌线程列表锁152     list<Thread *> busyThreadList;                                // 忙碌线程列表153 154     CriticalSectionLock waitTaskLock;155     list<WaitTask *> waitTaskList;                                // 任务列表156 157     HANDLE                    dispatchThrad;                        // 分发任务线程158     HANDLE                    stopEvent;                            // 通知线程退出的时间159     HANDLE                    completionPort;                        // 完成端口160     size_t                    maxNumOfThread;                        // 线程池中最大的线程数161     size_t                    minNumOfThread;                        // 线程池中最小的线程数162     size_t                    numOfLongFun;                        // 线程池中最小的线程数163 };
  1 #include "stdafx.h"  2 #include "ThreadPool.h"  3 #include <process.h>  4   5   6 ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread)  7 {  8     if (minNumOfThread < 2)  9         this->minNumOfThread = 2; 10     else 11         this->minNumOfThread = minNumOfThread; 12     if (maxNumOfThread < this->minNumOfThread * 2) 13         this->maxNumOfThread = this->minNumOfThread * 2; 14     else 15         this->maxNumOfThread = maxNumOfThread; 16     stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 17     completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); 18  19     idleThreadList.clear(); 20     CreateIdleThread(this->minNumOfThread); 21     busyThreadList.clear(); 22  23     dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0); 24     numOfLongFun = 0; 25 } 26  27 ThreadPool::~ThreadPool() 28 { 29     SetEvent(stopEvent); 30     PostQueuedCompletionStatus(completionPort, 0, (DWORD)EXIT, NULL); 31  32     CloseHandle(stopEvent); 33 } 34  35 BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun) 36 { 37     waitTaskLock.Lock(); 38     WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun); 39     waitTaskList.push_back(waitTask); 40     waitTaskLock.UnLock(); 41     PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL); 42     return TRUE; 43 } 44  45 void ThreadPool::CreateIdleThread(size_t size) 46 { 47     idleThreadLock.Lock(); 48     for (size_t i = 0; i < size; i++) 49     { 50         idleThreadList.push_back(new Thread(this)); 51     } 52     idleThreadLock.UnLock(); 53 } 54  55 void ThreadPool::DeleteIdleThread(size_t size) 56 { 57     idleThreadLock.Lock(); 58     size_t t = idleThreadList.size(); 59     if (t >= size) 60     { 61         for (size_t i = 0; i < size; i++) 62         { 63             auto thread = idleThreadList.back(); 64             delete thread; 65             idleThreadList.pop_back(); 66         } 67     } 68     else 69     { 70         for (size_t i = 0; i < t; i++) 71         { 72             auto thread = idleThreadList.back(); 73             delete thread; 74             idleThreadList.pop_back(); 75         } 76     } 77     idleThreadLock.UnLock(); 78 } 79  80 ThreadPool::Thread *ThreadPool::GetIdleThread() 81 { 82     Thread *thread = NULL; 83     idleThreadLock.Lock(); 84     if (idleThreadList.size() > 0) 85     { 86         thread = idleThreadList.front(); 87         idleThreadList.pop_front(); 88     } 89     idleThreadLock.UnLock(); 90  91     if (thread == NULL && getCurNumOfThread() < maxNumOfThread) 92     { 93         thread = new Thread(this); 94     } 95  96     if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK) 97     { 98         thread = new Thread(this); 99         InterlockedIncrement(&maxNumOfThread);100     }101     return thread;102 }103 104 void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread)105 {106     idleThreadLock.Lock();107     idleThreadList.push_back(busyThread);108     idleThreadLock.UnLock();109 110     busyThreadLock.Lock();111     for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++)112     {113         if (*it == busyThread)114         {115             busyThreadList.erase(it);116             break;117         }118     }119     busyThreadLock.UnLock();120 121     if (maxNumOfThread != 0 && idleThreadList.size() > maxNumOfThread * 0.8)122     {123         DeleteIdleThread(idleThreadList.size() / 2);124     }125 126     PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL);127 }128 129 void ThreadPool::MoveThreadToBusyList(Thread * thread)130 {131     busyThreadLock.Lock();132     busyThreadList.push_back(thread);133     busyThreadLock.UnLock();134 }135 136 void ThreadPool::GetTaskExcute()137 {138     Thread *thread = NULL;139     WaitTask *waitTask = NULL;140 141     waitTask = GetTask();142     if (waitTask == NULL)143     {144         return;145     }146 147     if (waitTask->bLong)148     {149         if (idleThreadList.size() > minNumOfThread)150         {151             thread = GetIdleThread();152         }153         else154         {155             thread = new Thread(this);156             InterlockedIncrement(&numOfLongFun);157             InterlockedIncrement(&maxNumOfThread);158         }159     }160     else161     {162         thread = GetIdleThread();163     }164 165     if (thread != NULL)166     {167         thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb);168         delete waitTask;169         MoveThreadToBusyList(thread);170     }171     else172     {173         waitTaskLock.Lock();174         waitTaskList.push_front(waitTask);175         waitTaskLock.UnLock();176     }177     178 }179 180 ThreadPool::WaitTask *ThreadPool::GetTask()181 {182     WaitTask *waitTask = NULL;183     waitTaskLock.Lock();184     if (waitTaskList.size() > 0)185     {186         waitTask = waitTaskList.front();187         waitTaskList.pop_front();188     }189     waitTaskLock.UnLock();190     return waitTask;191 }192 193 194 ThreadPool::Thread::Thread(ThreadPool *threadPool) :195     busy(FALSE),196     thread(INVALID_HANDLE_VALUE),197     task(NULL),198     taskCb(NULL),199     exit(FALSE),200     threadPool(threadPool)201 {202     thread = (HANDLE)_beginthreadex(0, 0, ThreadProc, this, CREATE_SUSPENDED, 0);203 }204 205 ThreadPool::Thread::~Thread()206 {207     exit = TRUE;208     task = NULL;209     taskCb = NULL;210     ResumeThread(thread);211     WaitForSingleObject(thread, INFINITE);212     CloseHandle(thread);213 }214 215 BOOL ThreadPool::Thread::isBusy()216 {217     return busy;218 }219 220 void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback)221 {222     busy = TRUE;223     this->task = task;224     this->param = param;225     this->taskCb = taskCallback;226     ResumeThread(thread);227 }228 229 unsigned int ThreadPool::Thread::ThreadProc(PVOID pM)230 {231     Thread *pThread = (Thread*)pM;232 233     while (true)234     {235         if (pThread->exit)236             break; //线程退出237 238         if (pThread->task == NULL && pThread->taskCb == NULL)239         {240             pThread->busy = FALSE;241             pThread->threadPool->MoveBusyThreadToIdleList(pThread);242             SuspendThread(pThread->thread);243             continue;244         }245 246         int resulst = pThread->task(pThread->param);247         if(pThread->taskCb)248             pThread->taskCb(resulst);249         WaitTask *waitTask = pThread->threadPool->GetTask();250         if (waitTask != NULL)251         {252             pThread->task = waitTask->task;253             pThread->taskCb = waitTask->taskCb;254             delete waitTask;255             continue;256         }257         else258         {259             pThread->task = NULL;260             pThread->param = NULL;261             pThread->taskCb = NULL;262             pThread->busy = FALSE;263             pThread->threadPool->MoveBusyThreadToIdleList(pThread);264             SuspendThread(pThread->thread);265         }266     }267 268     return 0;269 }
 1 // ThreadPool.cpp: 定义控制台应用程序的入口点。 2 // 3  4 #include "stdafx.h" 5 #include "ThreadPool.h" 6 #include <stdio.h> 7  8 class Task 9 {10 public:11     static int Task1(PVOID p) 12     {13         int i = 10;14         while (i >= 0)15         {16             printf("%d/n", i);17             Sleep(100);18             i--;19         }20         return i;21     }22 };23 24 class TaskCallback25 {26 public:27     static void TaskCallback1(int result)28     {29         printf("   %d/n", result);30     }31 };32 33 int main()34 {35     ThreadPool threadPool(2, 10);36     for (size_t i = 0; i < 30; i++)37     {38         threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1);39     }40     threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1, TRUE);41     42     getchar();43 44     return 0;45 }

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台