python的multiprocessing模塊是跨平臺的多進程模塊,multiprocessing具有創(chuàng)建子進程,進程間通信,隊列,事件,鎖等功能,multiprocessing模塊包含Process,Queue,Pipe,Lock等多個組件。
創(chuàng)新互聯(lián)專注于鐵西網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供鐵西營銷型網(wǎng)站建設(shè),鐵西網(wǎng)站制作、鐵西網(wǎng)頁設(shè)計、鐵西網(wǎng)站官網(wǎng)定制、重慶小程序開發(fā)服務(wù),打造鐵西網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供鐵西網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
創(chuàng)建進程的類
Process([group [, target [, name [, args [, kwargs]]]]])
參數(shù)介紹:
group參數(shù)未使用,值始終為None
target表示調(diào)用對象,即子進程要執(zhí)行的任務(wù)
args表示調(diào)用對象的位置參數(shù)元組,args=()
kwargs表示調(diào)用對象的字典,kwargs={'key':'value'}
name為子進程的名稱
Note:需要使用關(guān)鍵字方式指定參數(shù)
from multiprocessing import Process
def func():
print("first process")
if __name__ == '__main__':
# 創(chuàng)建進程對象,主進程和子進程是異步執(zhí)行的
p = Process(target=func)
# 開啟進程
p.start()
from multiprocessing import Process
def func(*args,**kwargs):
print("IPADDR:%s PORT:%d"%args)
for k in kwargs:
print("%s --> %s"%(k,kwargs[k]))
if __name__ == '__main__':
# 創(chuàng)建進程對象,并傳遞參數(shù)
p = Process(target=func,args=('127.0.0.1',8080),kwargs={'key':'value'})
# 如果主進程中的代碼已經(jīng)結(jié)束了,子進程還沒結(jié)束,主進程會等待子進程
# 開啟進程
p.start()
import os
from multiprocessing import Process
def func():
# os模塊的getpid方法可以獲取當(dāng)前進程的pid,getppid方法可以獲取當(dāng)前進程的父進程的pid
print("子進程pid:%s,父進程pid:%s"%(os.getpid(),os.getppid()))
if __name__ == '__main__':
p_l = []
# 創(chuàng)建多個進程
for i in range(10):
p = Process(target=func)
p.start()
p_l.append(p)
# 異步執(zhí)行子進程,最后執(zhí)行主進程中的代碼
for p in p_l:
p.join() # 阻塞,使主進程等待子進程結(jié)束
print("------主進程------")
結(jié)果:
子進程pid:9944,父進程pid:1484
子進程pid:8932,父進程pid:1484
子進程pid:8504,父進程pid:1484
子進程pid:14884,父進程pid:1484
子進程pid:4828,父進程pid:1484
子進程pid:14644,父進程pid:1484
子進程pid:14908,父進程pid:1484
子進程pid:1980,父進程pid:1484
子進程pid:14604,父進程pid:1484
子進程pid:10008,父進程pid:1484
------主進程------
Note :因為在windows操作系統(tǒng)中,沒有fork(),在創(chuàng)建子進程的時候會自動運行啟動它的文件中的所有代碼,因此必須將創(chuàng)建子進程的語句寫在ifname=='main':條件語句下。
import os
from multiprocessing import Process
class MyProcess(Process): # 必須繼承Process類
def __init__(self,arg1,arg2,arg3):
'''
繼承父類的初始化方法,加上自己需要的參數(shù)
:param arg1:
:param arg2:
:param arg3:
'''
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
def run(self):
'''
必須要有run方法的實現(xiàn)
:return:
'''
print('子進程:%d ,父進程:%s '%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3)
self.walk() # walk方法在子進程中執(zhí)行
def walk(self):
print('子進程:%d'%os.getpid())
if __name__ == '__main__':
p = MyProcess(1,2,3)
p.start() # 會默認調(diào)用run方法
p.walk() # walk方法直接在主進程中調(diào)用,并沒有在子進程中執(zhí)行
print('父進程:%d '%os.getpid())
結(jié)果:
子進程:1220
父進程:1220
子進程:2164 ,父進程:1220 1 2 3
子進程:2164
在為開啟daemon前,主進程會等待子進程結(jié)束在結(jié)束;
開啟daemon后,程序會在主進程結(jié)束時結(jié)束子進程
import time
from multiprocessing import Process
def cal_time(second):
while True:
print("current time:%s"%time.ctime())
time.sleep(second)
if __name__ == '__main__':
p = Process(target=cal_time,args=(1,))
'''
守護進程的作用:會隨著主進程代碼執(zhí)行結(jié)束而結(jié)束
守護進程要在start前設(shè)置
守護進程中不能再開啟子進程
'''
p.daemon = True
p.start()
for i in range(10):
time.sleep(0.2)
print('*'*i)
未開啟daemon結(jié)果:子進程一直在運行
current time:Tue Feb 12 17:48:44 2019
*
**
***
****
current time:Tue Feb 12 17:48:45 2019
*****
******
*******
********
*********
current time:Tue Feb 12 17:48:46 2019
current time:Tue Feb 12 17:48:47 2019
current time:Tue Feb 12 17:48:48 2019
current time:Tue Feb 12 17:48:49 2019
開啟daemon后結(jié)果:主進程結(jié)束程序就結(jié)束了
current time:Tue Feb 12 17:49:14 2019
*
**
***
****
current time:Tue Feb 12 17:49:15 2019
*****
******
*******
********
*********
name:查看進程名
pid:查看進程id
is_alive:查看進程是否正在運行
terminate:結(jié)束進程
import time
from multiprocessing import Process
def func():
print("start")
time.sleep(3)
print("end")
if __name__ == '__main__':
p = Process(target=func)
p.start()
time.sleep(3)
print("進程名:%s,進程id:%s"%(p.name,p.pid))
# is_alive方法查看進程是否正在運行
print(p.is_alive())
# terminate方法結(jié)束進程
p.terminate()
time.sleep(3)
print(p.is_alive())
結(jié)果:
start
進程名:Process-1,進程id:17564
True
False
進程鎖:當(dāng)多個進程訪問共享資源時,進程鎖保證同一時間只能有一個任務(wù)可以進行修改,程序的運行方法有并發(fā)改為串行,這樣速度慢了,但是保證了數(shù)據(jù)的安全
import os
import time
import random
from multiprocessing import Process,Lock
def func(lock,n):
lock.acquire() #加鎖
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
lock.release() #釋放
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=func,args=(lock,i))
p.start()
信號量:Lock(鎖)可以保證同一時間只能有一個任務(wù)對共享數(shù)據(jù)進行操作,而Semaphore(信號量)可以在同一時間讓指定數(shù)量的進程操作共享數(shù)據(jù)。
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
'''
迷你唱吧,20個人,同一時間只能有4個人進去
'''
def sing(i,sem):
sem.acquire() # 加鎖
print('%s enter the ktv'%i)
time.sleep(random.randint(1,10))
print('%s leave the ktv'%i)
sem.release() # 釋放
if __name__ == '__main__':
sem = Semaphore(4)
for i in range(20):
p = Process(target=sing,args=(i,sem))
p.start()
事件:Event是進程之間的狀態(tài)標(biāo)記通信,因為進程不共享數(shù)據(jù),所以事件對象需要以參數(shù)形式傳遞到函數(shù)中使用。
e = Event() # 實例化一個事件對象
e.set() # 標(biāo)記變?yōu)榉亲枞?br/>e.wait() # 默認標(biāo)記為阻塞,在等待的過程中,遇到非阻塞信號就繼續(xù)執(zhí)行
e.clear() # 標(biāo)記變?yōu)樽枞?br/>e.is_set() # 是否阻塞 True就是非阻塞,F(xiàn)alse是阻塞
import time
import random
from multiprocessing import Event
from multiprocessing import Process
def traffic_light(e):
while True:
if e.is_set(): # True為綠燈
time.sleep(3) # 等3秒后變?yōu)榧t燈
print("紅燈亮")
e.clear()
else: # False為紅燈,等3秒后變?yōu)榫G燈
time.sleep(3)
print("綠燈亮")
e.set()
def car(i,e):
e.wait() # 默認是紅燈
print("%s 車通過"%i)
if __name__ == '__main__':
e = Event()
# 控制紅綠燈的進程
tra = Process(target=traffic_light,args=(e,))
tra.start()
for i in range(100):
if i%6 == 0:
time.sleep(random.randint(1,3))
p = Process(target=car,args=(i,e))
p.start()
管道是進程間通信(IPC)的一種,管道是雙向通信的,但它不保證數(shù)據(jù)安全
創(chuàng)建管道:p1,p2=Pipe()
send():發(fā)送數(shù)據(jù)
recv():接收數(shù)據(jù)
close():關(guān)閉
def func(p):
foo,son = p
foo.close() # 不使用主進程的管道一端,先行關(guān)閉
while True:
try:
print(son.recv())
# 子進程在結(jié)束數(shù)據(jù)時,如果管道無數(shù)據(jù),且對端沒有close,就會報EOFError;如果管道無數(shù)據(jù),對端沒close,進程會阻塞
except EOFError:
break
if __name__ == '__main__':
foo,son = Pipe()
p = Process(target=func,args=((foo,son),))
p.start()
son.close() # 不使用子進程的管道一端,先行關(guān)閉
foo.send("hello")
foo.send("hello")
foo.close()
隊列:進程之間是獨立的,要實現(xiàn)進程間通信(IPC);multiprocessing模塊支持兩種形式:隊列(queue)和管道(pipe),這兩種方式都是使用消息傳遞的,且都是雙向通信的,Queue = Pipe+Lock。
q = Queue() # 創(chuàng)建隊列對象,無長度限制
q1 = Queue(3) # 傳參數(shù),創(chuàng)建一個有最大長度限制的隊列
q.put(1) # 放入一個數(shù)據(jù),對于無長度限制的隊列來說,永不阻塞;對于有長度限制的隊列來說,放滿就阻塞
q.get() # 隊列中有數(shù)據(jù)就取出一個數(shù)據(jù),隊列中無數(shù)據(jù)就會阻塞;遵循先進先出原則
q.qsize() # 查看隊列的數(shù)據(jù)大小,不一定準(zhǔn)確
from multiprocessing import Process
from multiprocessing import Queue
def queue_put(q):
q.put("123") # 子進程隊列中放入一個變量
if __name__ == '__main__':
q = Queue()
p = Process(target=queue_put,args=(q,))
p.start()
print(q.get()) # 主進程獲取到變量
示例2:子進程與子進程之間的通信
from multiprocessing import Process
from multiprocessing import Queue
def queue_put(q):
q.put("123") # 子進程隊列中放入一個變量
def queue_get(q):
print(q.get()) # 另一個子進程獲取到隊列中的數(shù)據(jù)
if __name__ == '__main__':
q = Queue()
p = Process(target=queue_put,args=(q,))
p.start()
p1 = Process(target=queue_get,args=(q,))
p1.start()
JoinableQueue也是multiprocessing模塊的一種隊列的實現(xiàn),但它與Queue不同的是JoinableQueue允許項目的使用者通知生成者項目已經(jīng)被成功處理。創(chuàng)建方式同Queue。
主要方法:put與get與Queue一致
? ? q.task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量,將引發(fā)ValueError異常
? ? q.join():生產(chǎn)者調(diào)用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止
import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process
'''
程序執(zhí)行流程
1、生產(chǎn)者生產(chǎn)的數(shù)據(jù)全部被消費 --> 2、生產(chǎn)者進程結(jié)束 --> 3、主進程代碼執(zhí)行結(jié)束 --> 4、消費者守護進程結(jié)束
'''
def producer(q,food):
for i in range(5):
q.put("%s -- %s"%(i,food))
print("生產(chǎn)了 %s"%(food))
time.sleep(random.random())
q.join() # 2、等待消費者消費完所有數(shù)據(jù)
def consumer(q,name):
while True:
food = q.get()
if food == None:break
print("%s 吃了 %s"%(name,food))
q.task_done() # 1、消費者每消費一個數(shù)據(jù)就返回一個task_done給生產(chǎn)者
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=(q,'youtiao'))
p1.start()
p2 = Process(target=producer,args=(q,'baozi'))
p2.start()
c1 = Process(target=consumer,args=(q,'daxiong'))
c1.daemon = True # 4、消費者守護進程結(jié)束
c1.start()
c2 = Process(target=consumer,args=(q,'chenglei'))
c2.daemon = True
c2.start()
c3 = Process(target=consumer,args=(q,'niu'))
c3.daemon = True
c3.start()
p1.join() # 3、等待p1執(zhí)行完畢
p2.join() # 3、等待p2執(zhí)行完畢
Manager也是multiprocessing模塊的一個類,這個類主要提供了進程間通信(IPC)的一個機制,它支持Python所有的數(shù)據(jù)類型,但不提供數(shù)據(jù)安全的機制。
from multiprocessing import Manager
from multiprocessing import Process
def func(d):
print(d)
d['num'] -= 10
if __name__ == '__main__':
m = Manager()
d = m.dict({'num':100})
l = []
for i in range(10):
p = Process(target=func,args=(d,))
p.start()
# p.join() # 同步
l.append(p)
for j in l:
j.join() # 異步
結(jié)果:
{'num': 100}
{'num': 90}
{'num': 80}
{'num': 70}
{'num': 60}
{'num': 50}
{'num': 40}
{'num': 30}
{'num': 20}
{'num': 10}
在執(zhí)行大量并發(fā)任務(wù)時,多進程是行之有效的手段之一,但是多進程需要注意幾個問題,一是操作系統(tǒng)不可能無限開啟進程,一般是有幾個核開啟幾個進程,二是開啟進程過多,系統(tǒng)資源占用過多,會導(dǎo)致系統(tǒng)運行速度變慢;那么遇到這種情況時pool(進程池)便是最好的解決方案。
Pool可以指定開啟一定數(shù)量的進程(一般為CPU核數(shù)+1個)等待用戶使用,當(dāng)有新的請求進入時,如果池中有空閑進程,便直接開啟;如果池中的進程都在使用,那么該請求就會等待,直到池中有進程結(jié)束,重用該進程。
import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
i -= 1
if __name__ == '__main__':
# 計算進程池所需事件
start1_time = time.time() # 開始時間
p = Pool(5) # 進程池中創(chuàng)建5個進程
p.map(func,range(100)) # 調(diào)用進程執(zhí)行任務(wù),target = func args = (1,2,3...),第二個參數(shù)要是可迭代對象
p.close() # 不允許再向進程池中添加任務(wù)
p.join() # 等待進程池中所有進程執(zhí)行結(jié)束
stop1_time = time.time() - start1_time # 結(jié)束時間
print("進程池所需時間: %s "%stop1_time)
# 計算多進程所需時間
start2_time = time.time() # 開始時間
l = []
for i in range(100):
p1 = Process(target=func,args=(i,))
p1.start()
l.append(p)
for j in l:
j.join()
stop2_time = time.time() - start2_time
print("多進程所需時間: %s"%stop2_time)
結(jié)果:
進程池所需時間: 0.19990277290344238
多進程所需時間: 1.7190303802490234
由上可知,進程池在執(zhí)行大量并發(fā)任務(wù)時的效率。
map(self, func, iterable, chunksize=None):將func
應(yīng)用于iterable
中的每個元素,收集結(jié)果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):異步提交任務(wù)的機制
apply(self, func, args=(), kwds={}):同步提交任務(wù)的機制
close():不允許再提交新的任務(wù)
join():等待進程池中的進程執(zhí)行結(jié)束在往下執(zhí)行,此方法只能在close()或teminate()之后調(diào)用
執(zhí)行apply或apply_async方法時,會返回ApplyResult類的實例對象
ApplyResult類有以下方法:
obj.get():獲取進程的返回值
obj.ready():調(diào)用完成時,返回True
obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)ValueError異常
obj.wait([timeout]):等待結(jié)果變?yōu)榭捎?/p>
import time
from multiprocessing import Pool
'''
apply:同步提交任務(wù)的機制
apply_async:異步提交任務(wù)機制
'''
def func(i):
time.sleep(1)
i += 1
print(i)
if __name__ == '__main__':
p = Pool(5)
res_l = []
for i in range(20):
# p.apply(func,args=(i,)) # 同步,執(zhí)行完畢立即獲取到返回值
res = p.apply_async(func,args=(i,)) # 異步,通過get獲取返回值
res_l.append(res)
p.close() # 不允許再提交新的任務(wù)
p.join() # 等待進程池中的進程執(zhí)行結(jié)束在往下執(zhí)行
for res in res_l:
print(res.get()) # 使用get來獲取apply_aync的結(jié)果
在進程池中,一個進程任務(wù)結(jié)束就會返回一個結(jié)果,主進程則調(diào)用一個函數(shù)去處理這個結(jié)果,這就是回調(diào)函數(shù)?;卣{(diào)函數(shù)是在主進程中完成的,不能傳參數(shù),只能接受多進程中函數(shù)的返回值;
在爬蟲中,使用回調(diào)比較多,爬蟲將訪問網(wǎng)頁、下載網(wǎng)頁的過程放到子進程中去做,分析數(shù)據(jù),處理數(shù)據(jù)讓回調(diào)函數(shù)去做,因為訪問網(wǎng)頁與下載網(wǎng)頁有網(wǎng)絡(luò)延時,而處理數(shù)據(jù)只占用很小的時間
import requests
from multiprocessing import Pool
def get(url):
ret = requests.get(url)
return {'url':url,
'status_code':ret.status_code,
'content':ret.text}
def parser(dic):
print(dic['url'],len(dic['content']))
parse_url = "URL:%s Size:%s"%(dic['url'],len(dic['content']))
with open('url.txt','a') as f:
f.write(parse_url+'\n')
if __name__ == '__main__':
url_l = [
'http://www.baidu.com',
'http://www.google.com',
'https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5',
'https://www.youtube.com/?app=desktop',
'https://www.facebook.com/'
]
p = Pool(5)
for i in url_l:
p.apply_async(get,args=(i,),callback=parser)
p.close()
p.join()