Python中multiprocessing的作用是什么,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),延慶企業(yè)網(wǎng)站建設(shè),延慶品牌網(wǎng)站建設(shè),網(wǎng)站定制,延慶網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,延慶網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
一前言
使用python進(jìn)行并發(fā)處理多臺(tái)機(jī)器/多個(gè)實(shí)例的時(shí)候,我們可以使用threading ,但是由于著名的GIL存在,實(shí)際上threading 并未提供真正有效的并發(fā)處理,要充分利用到多核CPU,我們需要使用多進(jìn)程。Python提供了非常好用的多進(jìn)程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process對(duì)象來創(chuàng)建一個(gè)進(jìn)程,該P(yáng)rocess對(duì)象與Threading對(duì)象的用法基本相同,具有相同的方法(官方原話:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue類用于進(jìn)程之間的通信。話不多說 show me the code!
二使用
2.1 初識(shí)異同
下面的程序顯示threading和multiprocessing的在使用方面的異同,相近的函數(shù)join(),start(),append() 等,并做同一件事情打印自己的進(jìn)程pid
#!/usr/bin/env python
# encoding: utf-8
import os
import threading
import multiprocessing
def printer(msg):
print(msg, os.getpid())
print('Main begin:', os.getpid())
# threading
record = []
for i in range(5):
thread = threading.Thread(target=printer, args=('threading',))
thread.start()
record.append(thread)
for thread in record:
thread.join()
# multi-process
record = []
for i in range(5):
process = multiprocessing.Process(target=printer, args=('multiprocessing',))
process.start()
record.append(process)
for process in record:
process.join()
print('Main end:', os.getpid())
輸出結(jié)果
點(diǎn)擊(此處)折疊或打開
Main begin: 9524
threading 9524
threading 9524
threading 9524
threading 9524
threading 9524
multiprocessing 9539
multiprocessing 9540
multiprocessing 9541
multiprocessing 9542
multiprocessing 9543
Main end: 9524
從例子的結(jié)果可以看出多線程threading的進(jìn)程id和主進(jìn)程(父進(jìn)程)pid一樣 ,同為9524; 多進(jìn)程打印的pid每個(gè)都不一樣,for循環(huán)中每創(chuàng)建一個(gè)process對(duì)象都年開一個(gè)進(jìn)程。其他相關(guān)的方法基本類似。
2.2 用法
創(chuàng)建進(jìn)程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示調(diào)用對(duì)象,
args表示調(diào)用對(duì)象的位置參數(shù)元組。
kwargs表示調(diào)用對(duì)象的字典。
name為進(jìn)程的別名。
group實(shí)質(zhì)上不使用,為None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動(dòng)某個(gè)進(jìn)程,并自動(dòng)調(diào)用run方法.
屬性:authkey、daemon(要通過start()設(shè)置,必須設(shè)置在方法start之前)、exitcode(進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束)、name、pid。其中daemon是父進(jìn)程終止后自動(dòng)終止,且自己不能產(chǎn)生新進(jìn)程,必須在start()之前設(shè)置。
2.3 創(chuàng)建單進(jìn)程
單線程比較簡單,創(chuàng)建一個(gè) Process的實(shí)例對(duì)象就好,傳入?yún)?shù) target 為已經(jīng)定義好的方法worker以及worker需要的參數(shù)
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午6:45
func:
"""
import multiprocessing
import datetime, time
def worker(interval):
print("process start: {0}".format(datetime.datetime.today()));
time.sleep(interval)
print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
p = multiprocessing.Process(target=worker, args=(5,))
p.start()
p.join()
print "end!"
2.4 創(chuàng)建多進(jìn)程
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午7:50
func:
"""
import multiprocessing
def worker(num):
print "worker %d" %num
if __name__ == "__main__":
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
proc = []
for i in xrange(5):
p = multiprocessing.Process(target=worker, args=(i,))
proc.append(p)
for p in proc:
p.start()
for p in proc:
p.join()
print "end ..."
輸出
點(diǎn)擊(此處)折疊或打開
The number of CPU is:4
worker 0
worker 1
worker 2
worker 3
worker 4
main process end ...
2.5 線程池
multiprocessing提供進(jìn)程池的類--Pool,它可以指定程序最大可以調(diào)用的進(jìn)程數(shù)量,當(dāng)有新的請(qǐng)求提交到pool中時(shí),如果進(jìn)程池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果進(jìn)程池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來它。
構(gòu)造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes : 使用的工作進(jìn)程的數(shù)量,如果processes是None,默認(rèn)使用os.cpu_count()返回的數(shù)量。
initializer: 如果initializer是None,那么每一個(gè)工作進(jìn)程在開始的時(shí)候會(huì)調(diào)用initializer(*initargs)。
maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個(gè)新的工作進(jìn)程來替代原進(jìn)程,來讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會(huì)一直存活。
context: 用在制定工作進(jìn)程啟動(dòng)時(shí)的上下文,一般使用multiprocessing.Pool()或者一個(gè)context對(duì)象的Pool()方法來創(chuàng)建一個(gè)池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。
實(shí)例方法:
apply(func[, args[, kwds]]):同步進(jìn)程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :異步進(jìn)程池
close() : 關(guān)閉進(jìn)程池,阻止更多的任務(wù)提交到pool,待任務(wù)完成后,工作進(jìn)程會(huì)退出。
terminate() : 結(jié)束工作進(jìn)程,不在處理未完成的任務(wù).
join() : 等待工作線程的退出,在調(diào)用join()前必須調(diào)用close()或者 terminate(),因?yàn)楸唤K止的進(jìn)程需要被父進(jìn)程調(diào)用wait(join等價(jià)與wait),否則進(jìn)程會(huì)成為僵尸進(jìn)程。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午7:50
func:
"""
from multiprocessing import Pool
import time
def worker(num):
print "worker %d" %num
time.sleep(2)
print "end worker %d" %num
if __name__ == "__main__":
proc_pool = Pool(2)
for i in xrange(4):
proc_pool.apply_async(worker, (i,)) #使用了異步調(diào)用,從輸出結(jié)果可以看出來
proc_pool.close()
proc_pool.join()
print "main process end ..."
輸出結(jié)果
點(diǎn)擊(此處)折疊或打開
worker 0
worker 1
end worker 0
end worker 1
worker 2
worker 3
end worker 2
end worker 3
main process end ..
解釋:創(chuàng)建一個(gè)進(jìn)程池pool 對(duì)象proc_pool,并設(shè)定進(jìn)程的數(shù)量為2,xrange(4)會(huì)相繼產(chǎn)生四個(gè)對(duì)象[0, 1, 2, 4],四個(gè)對(duì)象被提交到pool中,因pool指定進(jìn)程數(shù)為2,所以0、1會(huì)直接送到進(jìn)程中執(zhí)行,當(dāng)其中的2個(gè)任務(wù)執(zhí)行完之后才空出2進(jìn)程處理對(duì)象2和3,所以會(huì)出現(xiàn)輸出 worker 2 worker 3 出現(xiàn)在end worker 0 end worker 1之后。思考一下如果調(diào)用 proc_pool.apply(worker, (i,)) 的輸出結(jié)果會(huì)是什么樣的?
2.6 使用queue
multiprocessing提供隊(duì)列類,可以通過調(diào)用multiprocessing.Queue(maxsize) 初始化隊(duì)列對(duì)象,maxsize表示隊(duì)列里面最多的元素個(gè)數(shù)。
例子 創(chuàng)建了兩個(gè)函數(shù)入隊(duì),出隊(duì),出隊(duì)處理時(shí)使用了lock特性,串行化取數(shù)據(jù)。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午9:03
func:
"""
import time
from multiprocessing import Process, current_process,Lock,Queue
import datetime
def inputQ(queue):
time.sleep(1)
info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
queue.put(info)
def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print ("proc_name: " + current_process().name + ' gets info :' + info)
lock.release()
if __name__ == '__main__':
record1 = [] # store input processes
record2 = [] # store output processes
lock = Lock() # To prevent messy print
queue = Queue(3)
for i in range(10):
process = Process(target=inputQ, args=(queue,))
process.start()
record1.append(process)
for i in range(10):
process = Process(target=outputQ, args=(queue,lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close() # No more object will come, close the queue
for p in record2:
p.join()
2.7 使用pipe
Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創(chuàng)建單向管道 (默認(rèn)為雙向)。一個(gè)進(jìn)程從PIPE一端輸入對(duì)象,然后被PIPE另一端的進(jìn)程接收,單向管道只允許管道一端的進(jìn)程輸入,而雙向管道則允許從兩端輸入。
用法 multiprocessing.Pipe([duplex])
該類返回一組對(duì)象實(shí)例(conn1, conn2),分別代表發(fā)送和接受消息的兩端。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午8:01
func:
"""
from multiprocessing import Process, Pipe
def p1(conn, name):
conn.send('hello ,{name}'.format(name=name))
print "p1 receive :", conn.recv()
conn.close()
def p2(conn, name):
conn.send('hello ,{name}'.format(name=name))
print "p2 receive :", conn.recv()
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
輸出:
點(diǎn)擊(此處)折疊或打開
p1 receive : hello ,child_conn
p2 receive : hello ,parent_conn
該例子中 p1 p2 通過pipe 給彼此相互發(fā)送信息,p1 發(fā)送"parent_conn" 給 p2 ,p2 發(fā)送"child_conn" 給p1.
2.8 daemon程序?qū)Ρ冉Y(jié)果
import multiprocessing
import datetime, time
def worker(interval):
print("process start: {0}".format(datetime.datetime.today()));
time.sleep(interval)
print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
p = multiprocessing.Process(target=worker, args=(5,))
p.start()
print "end!"
輸出:
點(diǎn)擊(此處)折疊或打開
end!
process start: 2017-07-02 18:47:30.656244
process end: 2017-07-02 18:47:35.657464
設(shè)置 daemon = True,程序隨著主程序結(jié)束而不等待子進(jìn)程。
import multiprocessing
import datetime, time
def worker(interval):
print("process start: {0}".format(datetime.datetime.today()));
time.sleep(interval)
print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
p = multiprocessing.Process(target=worker, args=(5,))
p.daemon = True
p.start()
print "end!"
輸出:
end!
因?yàn)樽舆M(jìn)程設(shè)置了daemon屬性,主進(jìn)程結(jié)束,multiprocessing創(chuàng)建的進(jìn)程對(duì)象就隨著結(jié)束了。
import multiprocessing
import datetime, time
def worker(interval):
print("process start: {0}".format(datetime.datetime.today()));
time.sleep(interval)
print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
p = multiprocessing.Process(target=worker, args=(5,))
p.daemon = True #
p.start()
p.join() #進(jìn)程執(zhí)行完畢后再關(guān)閉
print "end!"
輸出:
點(diǎn)擊(此處)折疊或打開
process start: 2017-07-02 18:48:20.953754
process end: 2017-07-02 18:48:25.954736
2.9 Lock()
當(dāng)多個(gè)進(jìn)程需要訪問共享資源的時(shí)候,Lock可以用來避免訪問的沖突。
實(shí)例方法:
acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
例子:
多個(gè)進(jìn)程使用同一個(gè)std_out ,使用lock機(jī)制確保同一個(gè)時(shí)刻有一個(gè)一個(gè)進(jìn)程獲取輸出。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午9:28
func:
"""
from multiprocessing import Process, Lock
def func_with_lock(l, i):
l.acquire()
print 'hello world', i
l.release()
def func_without_lock(i):
print 'hello world', i
if __name__ == '__main__':
lock = Lock()
print "func_with_lock :"
for num in range(10):
Process(target=func_with_lock, args=(lock, num)).start()
輸出:
點(diǎn)擊(此處)折疊或打開
func_with_lock :
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
關(guān)于Python中multiprocessing的作用是什么問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。