?
周村網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司,周村網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為周村上千多家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個售后服務(wù)好的周村做網(wǎng)站的公司定做!
?
?
目錄
multiprocessing模塊:...1
多進程舉例:...2
multiprocessing.Pool,進程池:...3
concurrent包:...3
?
?
?
多進程:
由于py的GIL,多線程未必是cpu密集型程序好的選擇;
?
多進程,可在完全獨立的進程環(huán)境中運行程序,可充分利用多處理器;
但進程本身的隔離帶來的數(shù)據(jù)不共享是個問題;
線程比進程輕量級;
?
Process類:
遵循了Thread類的API,減少了學(xué)習(xí)難度;
多進程,一定要在__main__()中,否則拋錯;
?
p=multiprocessing.Process()
p.pid,進程ID;
p.exitcode,進程的退出狀態(tài)碼;
p.terminate(),終止指定的進程;
?
進程間同步:
提供了和線程同步一樣的類,使用的方法一樣,使用的效果也類似;
不過,進程間代價要高于線程,而且底層實現(xiàn)不同,只不過py屏蔽了這些,讓用戶能簡單使用(py提供的庫抹平了它們之間的差別,為方便使用);
?
multiprocessing還提供了共享內(nèi)存、服務(wù)器進程來共享數(shù)據(jù);還提供了Queue、Pipe(上一個進程的標(biāo)準(zhǔn)輸出到下一個進程的標(biāo)準(zhǔn)輸入)用來進程間通信;
Queue.queue是線程級別的鎖;
multiprocessing.Queue可跨進程用,用的少;此時應(yīng)用第三方工具MQ,單機進程間通信用的也少,一般應(yīng)用都是跨節(jié)點的進程間通信,即RPC;RPC框架很多,如swfit、dubbo;
?
通信方式不同:
多進程就是啟動多個解釋器進程,進程間通信必須序列化、反序列化;
數(shù)據(jù)的線程安全性問題,由于每個進程中沒有實現(xiàn)多線程,GIL可以說沒用了;
?
進程池:
只允許使用計算機這么多資源(不能搶占計算機其它資源);
很多進程要反復(fù)創(chuàng)建的情形下,用進程池;
?
多進程、多線程的選擇:
CPU密集型,cpython中使用GIL,多線程時鎖相互競爭,多核優(yōu)勢不能發(fā)揮,用py多進程(多個解釋器進程)效率更高;
IO密集型,適合用多線程,減少IO序列化開銷,且在IO等待時,切換到其它線程繼續(xù)執(zhí)行,效率不錯;
?
應(yīng)用場景:
請求-應(yīng)答模型;
web應(yīng)用中常見的處理模型,用多進程+多線程;
如,nginx工作模式:
master啟動多個worker工作進程,一般和cpu數(shù)目相同;
worker中啟動多線程,提高并發(fā)處理能力,worker處理用戶的請求,往往需要等待數(shù)據(jù);
?
nginx應(yīng)就地本地編譯(本地指令集,甚至用指定cpu(如intel)編譯器編譯,這樣可優(yōu)化指令,性能更高);
?
?
例:
def calc(i):
??? sum = 0
??? for _ in range(100000000):
??????? sum += 1
?
if __name__ == '__main__':?? #使用多進程,必須要在main中執(zhí)行,否則報錯
??? start = datetime.datetime.now()
?
??? lst = []
??? for i in range(5):
??????? p = multiprocessing.Process(target=calc, args=(i,), name='p-{}'.format(i))
??????? p.start()
??????? lst.append(p)
??? for p in lst:
??????? p.join()
?
??? delta = (datetime.datetime.now()-start).total_seconds()
??? print(delta)?? #win上查看,有5個py進程
輸出:
20.037146
?
?
例,進程池:
def calc(i):
??? sum = 0
??? for _ in range(100000000):
??????? sum += 1
?
if __name__ == '__main__':
??? start = datetime.datetime.now()
?
??? pool = multiprocessing.Pool(5)
??? for i in range(5):
??????? pool.apply_async(calc,args=(i,))
??? pool.close()?? #重要,要有此步
??? pool.join()
?
??? delta = (datetime.datetime.now()-start).total_seconds()
??? print(delta)
?
?
?
目前僅有一個concurrent.futures,3.2引入;
異步并行任務(wù)編程模塊,提供一個高級的異步可執(zhí)行的便利接口;
?
提供了2個池執(zhí)行器:
ThreadPoolExecutor,異步調(diào)用的線程池的Executor;
ProcessPoolExecutor,異步調(diào)用的進程池的Executor;
?
from conncurrent import futures
executor=futures.ThreadPoolExecutor(max_workers=1),池中至多創(chuàng)建max_workers個線程來同時異步執(zhí)行,默認1個,返回Executor實例;
f=executor.submit(fn,*args,**kwargs),提交執(zhí)行的函數(shù)及其參數(shù),返回Futures實例;
executor.shutdown(wait=True),清理池;
?
Future類(submit()的實例):
f=executor.submit(work,2)
f.result(),可查看調(diào)用的返回的結(jié)果,函數(shù)的return結(jié)果;
f.done(),如果調(diào)用被成功的取消或執(zhí)行完成,返回True,是標(biāo)志,注意不是函數(shù)的返回值;
f.cancelled(),如果調(diào)用被成功的取消,返回True;
f.running(),如果正在運行且不能被取消,返回True;
f.cancel(),嘗試取消調(diào)用,如果已經(jīng)執(zhí)行且不能取消,返回False,否則返回True;
f.result(timeout=None),取返回的結(jié)果,超時為None,一直等待返回;
f.exception(timeout=None),取返回的異常,超時為None,一直等待返回;
?
支持上下文管理:
futures.ThreadPoolExecutor繼承自_base.Executor,父類有__enter__()和__exit__()方法;
例:
with futures.ThreadPoolExecutor(max_workers=1) as executor:
??? future = executor.submit(pow, 2, 3)
??? print(future.result())
輸出:
8
?
總結(jié):
統(tǒng)一了線程池、進程池調(diào)用,簡化了編程;
是py簡單的思想哲學(xué)的體現(xiàn);
唯一的缺點,無法設(shè)置線程名稱;
?
例,多線程:
def work(n):
??? logging.info('working-{}'.format(n))
??? time.sleep(5)
??? logging.info('end-work-{}'.format(n))
?
executor = futures.ThreadPoolExecutor(3)
?
fs = []
?
for i in range(3):
??? f = executor.submit(work, i)
??? fs.append(f)
?
for i in range(3):
??? f = executor.submit(work, i)
??? fs.append(f)
?
for i in range(3):
??? f = executor.submit(work, i)
??? fs.append(f)
?
while True:
??? time.sleep(2)
??? logging.info(threading.enumerate())
??? flag = True
??? for f in fs:
??????? flag = flag and f.done()
??????? if flag:
??????????? executor.shutdown()
??????????? logging.info(threading.enumerate())
??????????? break
?
例,多進程:
def work(n):
??? logging.info('working-{}'.format(n))
??? time.sleep(5)
??? logging.info('end-work-{}'.format(n))
?
if __name__ == '__main__':
??? executor = futures.ProcessPoolExecutor(3)
??? fs = []
?
??? for i in range(3):
??????? f = executor.submit(work, i)
??????? fs.append(f)
?
??? for i in range(3):
??????? f = executor.submit(work, i)
??????? fs.append(f)
?
??? for i in range(3):
??????? f = executor.submit(work, i)
??????? fs.append(f)
?
??? while True:
??????? time.sleep(2)
??????? logging.info(threading.enumerate())
??????? flag = True
??????? for f in fs:
??????????? flag = flag and f.done()
??????????? if flag:
??????????????? executor.shutdown()
??????????????? # logging.info(threading.enumerate())?? #多進程用不上
???????????? ???break
?
?