基于C++11 實現(xiàn)的線程池提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔
創(chuàng)新互聯(lián)服務項目包括惠民網(wǎng)站建設、惠民網(wǎng)站制作、惠民網(wǎng)頁制作以及惠民網(wǎng)絡營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術優(yōu)勢、行業(yè)經驗、深度合作伙伴關系等,向廣大中小型企業(yè)、政府機構等提供互聯(lián)網(wǎng)行業(yè)的解決方案,惠民網(wǎng)站推廣取得了明顯的社會效益與經濟效益。目前,我們服務的客戶以成都為中心已經輻射到惠民省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!
??C++11加入了線程庫,這是歷史性的一步跨越,因為它已然能夠實現(xiàn)簡單的并發(fā)了,但有這樣一個問題:如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務就結束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。
??如何解決這種線程的頻繁創(chuàng)建銷毀呢?就是線程執(zhí)行完一個任務,不會被銷毀,而是可以繼續(xù)執(zhí)行其他的任務呢?
??一個好的辦法就是使用線程池,不過線程池是個什么東西呢?它又該怎么實現(xiàn)呢?
1、線程池原理??線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務。線程池線程都是后臺線程。每個線程都使用默認的堆棧大小,以默認的優(yōu)先級運行,并處于多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件), 則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創(chuàng)建另一個輔助線程但線程的數(shù)目永遠不會超過大值。超過大值的線程可以排隊,但他們要等到其他線程完成后才啟動。
2、線程池的設計思路?實現(xiàn)線程池有以下幾個步驟:
(1)設置一個生產者消費者隊列,作為臨界資源。
(2)初始化n個線程,并讓其運行起來,加鎖去隊列里取任務運行
(3)當任務隊列為空時,所有線程阻塞。
(4)當生產者隊列來了一個任務后,先對隊列加鎖,把任務掛到隊列上,然后使用條件變量去通知阻塞中的一個線程來處理。
3、一個基于C++11的優(yōu)秀的線程池#include#include#include#include#include// 鎖
#include// 條件變量
#include#include#include
??頭文件中,沒有什么特別的,這里稍微說下:thread—線程相關的庫,mutex—互斥量,也就是互斥鎖,condition_variable—條件變量,用于喚醒線程和阻塞線程,future的話,在這里就是一個用來獲取線程數(shù)據(jù)的函數(shù)。
??對于C++11的并發(fā)編程部分(std::thread
,std::future
等)可以參考下面幾篇文章進行學習:
??線程池類中:
class ThreadPool {public:
ThreadPool(size_t);
templateauto enqueue(F&& f, Args&&... args)
->std::future::type>;
~ThreadPool();
private:
// 需要跟蹤線程,以便我們可以加入它們
std::vector< std::thread >workers;
// 任務隊列
std::queue< std::function>tasks;
// 同步
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
3.3 構造函數(shù)實現(xiàn)??這個構造函數(shù)主要是用來啟動一些工作線程,咋一看還不一定能看的懂,下面我們詳細分析:
stop
為false
for
循環(huán),循環(huán)里是一個很長的workers.emplace_back()
語句,就是添加線程到工作線程組。再往里面則是一個lambda
表達式。lambda
的函數(shù)體第一層是一個無線循環(huán),用于線程內不斷的從任務隊列取任務執(zhí)行,第二層有一個task
就是一個函數(shù)對象,然后又一個大括號,剛開始我沒看明白,但往下看就能懂,在這個大括號(大括號作用:控制lock臨時變量的生命周期,執(zhí)行完畢或return后自動釋放鎖)里面有:std::unique_locklock(this->queue_mutex);
加鎖this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
等待條件成立(當線程池被關閉或有可消費任務時跳過wait
繼續(xù);否則condition.wait
將會unlock
釋放鎖,其他線程可以繼續(xù)拿鎖,但此線程會阻塞此處并休眠,直到被notify_*
喚醒,被喚醒時wait會再次lock并判斷條件是否成立,如成立則跳過wait
,否則unlock
并休眠繼續(xù)等待下次喚醒)condition
只是返回了false
才會wait
,也就是!stop && empty
才會wait
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{for(size_t i = 0;ifor(;;)
{std::functiontask; //線程中的函數(shù)對象
// 通過lock互斥獲取一個隊列任務和任務的執(zhí)行函數(shù)
{std::unique_locklock(this->queue_mutex);
this->condition.wait(lock,
[this]{return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
// 任務隊列的隊首任務
task = std::move(this->tasks.front());
// 從隊列移除
this->tasks.pop();
}
// 調用函數(shù)執(zhí)行任務
task();
}
}
);
}
3.4 入隊—添加新的工作任務到線程池??我們重新審視std::queue< std::function
,它只能接受void
的函數(shù)類型,這里使用std::packaged_task
完成函數(shù)類型的推導,因為這還不是最終放入tasks
的對象,它要承接一個返回future
的工作,而package_task
就是來打包返回future
的。
??其次將任務函數(shù)和其參數(shù)綁定,構建一個packaged_task(packaged_task是對任務的一個抽象,咱們能夠給其傳遞一個函數(shù)來完成其構造。以后將任務投遞給任何線程去完成,經過packaged_task.get_future()方法獲取的future來獲取任務完成后的產出值)
// add new work item to the pool
templateauto ThreadPool::enqueue(F&& f, Args&&... args)
->std::future::type>{using return_type = typename std::result_of::type;
auto task = std::make_shared< std::packaged_task>( //指向F函數(shù)的智能指針
std::bind(std::forward(f), std::forward(args)...) //傳遞函數(shù)進行構造
);
// 獲取任務的future
std::futureres = task->get_future();
{// 獨占拿鎖
std::unique_locklock(queue_mutex);
// 不允許入隊到已經停止的線程池
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
// 將任務添加到任務隊列
tasks.emplace([task](){(*task)(); });
}
// 發(fā)送通知,喚醒一個wait狀態(tài)的工作線程重新?lián)屾i并重新判斷wait條件
condition.notify_one();
return res;
}
3.5 析構函數(shù)// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_locklock(queue_mutex); // 拿鎖
stop = true; // 停止標志
}
condition.notify_all(); // 條件變量進行通知
// 等待所有工作線程結束
for(std::thread &worker: workers)
worker.join();
}
3.6 測試代碼#include#include#include#include "ThreadPool.h"
int main()
{ThreadPool pool(4);
std::vector< std::future>results;
for(int i = 0; i< 8; ++i) {results.emplace_back(
pool.enqueue([i] {std::cout<< "hello "<< i<< std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout<< "world "<< i<< std::endl;
return i*i;
})
);
}
for(auto && result: results)
std::cout<< result.get()<< ' ';
std::cout<< std::endl;
return 0;
}
3.7 演示期待大家和我交流,留言或者私信,一起學習,一起進步!
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧