小編給大家分享一下linux 下c++線程池的簡單實現(xiàn),相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
在關(guān)嶺等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、成都網(wǎng)站制作 網(wǎng)站設(shè)計制作按需定制,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計,全網(wǎng)整合營銷推廣,成都外貿(mào)網(wǎng)站制作,關(guān)嶺網(wǎng)站建設(shè)費用合理。
作為一個c++菜鳥,研究半天這個代碼的實現(xiàn)原理,發(fā)現(xiàn)好多語法不太熟悉,因此加了一大堆注釋,僅供參考。該段代碼主要通過繼承workthread類來實現(xiàn)自己的線程代碼,通過thread_pool類來管理線程池,線程池不能夠?qū)崿F(xiàn)動態(tài)改變線程數(shù)目,存在一定局限性。目前可能還有缺陷,畢竟c++來封裝這個東西,資源釋放什么的必須想清楚,比如vector存儲了基類指針實現(xiàn)多態(tài),那么如何釋放對象仍需要考慮,后續(xù)我可能會更進一步修改完善該代碼,下面貢獻一下自己的勞動成果。
#include#include #include #include using namespace std; /* WorkerThread class This class needs to be sobclassed by the user. */ class WorkerThread{ public: int id; unsigned virtual executeThis() { return 0; } WorkerThread(int id) : id(id) {} virtual ~WorkerThread(){} }; /* ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and synchronizations between all threads. */ class ThreadPool{ public: ThreadPool(); ThreadPool(int maxThreadsTemp); virtual ~ThreadPool(); void destroyPool(int maxPollSecs); bool assignWork(WorkerThread *worker); bool fetchWork(WorkerThread **worker); void initializeThreads(); static void *threadExecute(void *param); // pthread_create()調(diào)用的函數(shù)必須為靜態(tài)的 static pthread_mutex_t mutexSync; static pthread_mutex_t mutexWorkCompletion;//工作完成個數(shù)互斥量 private: int maxThreads; pthread_cond_t condCrit; sem_t availableWork; sem_t availableThreads; vector workerQueue; int topIndex; int bottomIndex; int incompleteWork; int queueSize; };
#include#include "threadpool.h" using namespace std; //初始化類的靜態(tài)成員必須加上類型和作用域,static數(shù)據(jù)成員必須在類定義體的外部定義,不像不同數(shù)據(jù)成員可以用構(gòu)造函數(shù)初始化 //應(yīng)該在定義時進行初始化,注意是定義,這個定義應(yīng)該放在包含類的非內(nèi)聯(lián)成員函數(shù)定義的文件中。 //注:靜態(tài)成員函數(shù)只能使用靜態(tài)變量,非靜態(tài)沒有限制,靜態(tài)變量必須在外部定義和初始化,沒初始化就為默認數(shù)值 pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER; ThreadPool::ThreadPool() { ThreadPool(2); } ThreadPool::ThreadPool(int maxThreads) { if (maxThreads < 1) maxThreads=1; pthread_mutex_lock(&mutexSync); this->maxThreads = maxThreads; this->queueSize = maxThreads; workerQueue.resize(maxThreads, NULL);//調(diào)整容器大小,然后用默認構(gòu)造函數(shù)初始化新的空間 topIndex = 0; bottomIndex = 0; incompleteWork = 0; sem_init(&availableWork, 0, 0); //工作隊列信號量,表示已經(jīng)加入隊列的工作,初始時沒有工作 sem_init(&availableThreads, 0, queueSize); //空閑線程信號量,一開始就有quisize個線程可以使用 pthread_mutex_unlock(&mutexSync); } //調(diào)用pthread_create()讓線程跑起來,threadExecute是類的靜態(tài)函數(shù),因為pthread_create()第三個參數(shù)必須為靜態(tài)函數(shù) void ThreadPool::initializeThreads() { for(int i = 0; i 0 ) { //cout << "Work is still incomplete=" << incompleteWork << endl; sleep(maxPollSecs); } cout << "All Done!! Wow! That was a lot of work!" << endl; sem_destroy(&availableWork); sem_destroy(&availableThreads); pthread_mutex_destroy(&mutexSync); pthread_mutex_destroy(&mutexWorkCompletion); } //分配人物到top,然后通知有任務(wù)需要執(zhí)行。 bool ThreadPool::assignWork(WorkerThread *workerThread) { pthread_mutex_lock(&mutexWorkCompletion); incompleteWork++; //cout << "assignWork...incomapleteWork=" << incompleteWork << endl; pthread_mutex_unlock(&mutexWorkCompletion); sem_wait(&availableThreads); pthread_mutex_lock(&mutexSync); //workerVec[topIndex] = workerThread; workerQueue[topIndex] = workerThread; //cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl; if(queueSize !=1 ) topIndex = (topIndex+1) % (queueSize-1); sem_post(&availableWork); pthread_mutex_unlock(&mutexSync); return true; } //當已經(jīng)有人物放到隊列里面后,就會受到通知,然后從底部拿走工作,在workerArg中返回 bool ThreadPool::fetchWork(WorkerThread **workerArg) { sem_wait(&availableWork); pthread_mutex_lock(&mutexSync); WorkerThread * workerThread = workerQueue[bottomIndex]; workerQueue[bottomIndex] = NULL; *workerArg = workerThread; if(queueSize !=1 ) bottomIndex = (bottomIndex+1) % (queueSize-1); sem_post(&availableThreads); pthread_mutex_unlock(&mutexSync); return true; } //每個線程運行的靜態(tài)函數(shù)實體,executeThis 方法將會被繼承累從寫,之后實現(xiàn)具體線程的工作。 void *ThreadPool::threadExecute(void *param) { WorkerThread *worker = NULL; while(((ThreadPool *)param)->fetchWork(&worker)) { if(worker) { worker->executeThis(); //cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl; delete worker; worker = NULL; } pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) ); //cout << "Thread " << pthread_self() << " has completed a Job !" << endl; ((ThreadPool *)param)->incompleteWork--; pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) ); } return 0; }
#include#include "threadpool.h" using namespace std; #define ITERATIONS 20 class SampleWorkerThread : public WorkerThread { public: int id; unsigned virtual executeThis() { // Instead of sleep() we could do anytime consuming work here. // Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds) cout<<"This is SampleWorkerThread sleep 2s"< initializeThreads(); //用來計算時間間隔。 time_t t1=time(NULL); //分配具體工作到線程池 for(unsigned int i=0;i assignWork(myThreathreadExecuted); } //銷毀錢等待所有線程結(jié)束,等待間隔為2秒。 myPool->destroyPool(2); time_t t2=time(NULL); cout << t2-t1 << " seconds elapsed\n" << endl; delete myPool; return 0; }
ubuntu 12.04下運行成功,編譯命令如下:g++ -g main.cpp thread_pool.cpp -o thread_pool -lpthread
以上是“l(fā)inux 下c++線程池的簡單實現(xiàn)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!