真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Python進程之multiprocessing

python的multiprocessing模塊是跨平臺的多進程模塊,multiprocessing具有創(chuàng)建子進程,進程間通信,隊列,事件,鎖等功能,multiprocessing模塊包含Process,Queue,Pipe,Lock等多個組件。

創(chuàng)新互聯(lián)專注于鐵西網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供鐵西營銷型網(wǎng)站建設(shè),鐵西網(wǎng)站制作、鐵西網(wǎng)頁設(shè)計、鐵西網(wǎng)站官網(wǎng)定制、重慶小程序開發(fā)服務(wù),打造鐵西網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供鐵西網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

1、Process

創(chuàng)建進程的類

Process([group [, target [, name [, args [, kwargs]]]]])

參數(shù)介紹:
group參數(shù)未使用,值始終為None
target表示調(diào)用對象,即子進程要執(zhí)行的任務(wù)
args表示調(diào)用對象的位置參數(shù)元組,args=()
kwargs表示調(diào)用對象的字典,kwargs={'key':'value'}
name為子進程的名稱

Note:需要使用關(guān)鍵字方式指定參數(shù)

示例1:創(chuàng)建單進程

from multiprocessing import Process

def func():
    print("first process")

if __name__ == '__main__':
    # 創(chuàng)建進程對象,主進程和子進程是異步執(zhí)行的
    p = Process(target=func)
    # 開啟進程
    p.start()

示例2:傳參

from multiprocessing import Process

def func(*args,**kwargs):
    print("IPADDR:%s PORT:%d"%args)
    for k in kwargs:
        print("%s --> %s"%(k,kwargs[k]))

if __name__ == '__main__':
    # 創(chuàng)建進程對象,并傳遞參數(shù)
    p = Process(target=func,args=('127.0.0.1',8080),kwargs={'key':'value'})
       # 如果主進程中的代碼已經(jīng)結(jié)束了,子進程還沒結(jié)束,主進程會等待子進程
    # 開啟進程
    p.start()

示例3:創(chuàng)建多進程

import os
from multiprocessing import Process

def func():
    # os模塊的getpid方法可以獲取當(dāng)前進程的pid,getppid方法可以獲取當(dāng)前進程的父進程的pid
    print("子進程pid:%s,父進程pid:%s"%(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p_l = []
    # 創(chuàng)建多個進程
    for i in range(10):
        p = Process(target=func)
        p.start()
        p_l.append(p)

    # 異步執(zhí)行子進程,最后執(zhí)行主進程中的代碼
    for p in p_l:
        p.join()   # 阻塞,使主進程等待子進程結(jié)束
    print("------主進程------")
結(jié)果:
子進程pid:9944,父進程pid:1484
子進程pid:8932,父進程pid:1484
子進程pid:8504,父進程pid:1484
子進程pid:14884,父進程pid:1484
子進程pid:4828,父進程pid:1484
子進程pid:14644,父進程pid:1484
子進程pid:14908,父進程pid:1484
子進程pid:1980,父進程pid:1484
子進程pid:14604,父進程pid:1484
子進程pid:10008,父進程pid:1484
------主進程------

Note :因為在windows操作系統(tǒng)中,沒有fork(),在創(chuàng)建子進程的時候會自動運行啟動它的文件中的所有代碼,因此必須將創(chuàng)建子進程的語句寫在ifname=='main':條件語句下。

示例4:使用類繼承的方式創(chuàng)建進程

import os
from multiprocessing import Process

class MyProcess(Process):   # 必須繼承Process類
    def __init__(self,arg1,arg2,arg3):
        '''
        繼承父類的初始化方法,加上自己需要的參數(shù)
        :param arg1:
        :param arg2:
        :param arg3:
        '''
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2
        self.arg3 = arg3

    def run(self):
        '''
        必須要有run方法的實現(xiàn)
        :return:
        '''
        print('子進程:%d ,父進程:%s '%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3)
        self.walk()  # walk方法在子進程中執(zhí)行

    def walk(self):
        print('子進程:%d'%os.getpid())

if __name__ == '__main__':
    p = MyProcess(1,2,3)
    p.start()  # 會默認調(diào)用run方法
    p.walk()   # walk方法直接在主進程中調(diào)用,并沒有在子進程中執(zhí)行

    print('父進程:%d '%os.getpid())

結(jié)果:
子進程:1220
父進程:1220 
子進程:2164 ,父進程:1220  1 2 3
子進程:2164

示例5:守護進程

在為開啟daemon前,主進程會等待子進程結(jié)束在結(jié)束;
開啟daemon后,程序會在主進程結(jié)束時結(jié)束子進程

import time
from multiprocessing import Process

def cal_time(second):
    while True:
        print("current time:%s"%time.ctime())
        time.sleep(second)

if __name__ == '__main__':
    p = Process(target=cal_time,args=(1,))
    '''
    守護進程的作用:會隨著主進程代碼執(zhí)行結(jié)束而結(jié)束
    守護進程要在start前設(shè)置
    守護進程中不能再開啟子進程
    '''
    p.daemon = True
    p.start()
    for i in range(10):
        time.sleep(0.2)
        print('*'*i)

未開啟daemon結(jié)果:子進程一直在運行
current time:Tue Feb 12 17:48:44 2019

*
**
***
****
current time:Tue Feb 12 17:48:45 2019
*****
******
*******
********
*********
current time:Tue Feb 12 17:48:46 2019
current time:Tue Feb 12 17:48:47 2019
current time:Tue Feb 12 17:48:48 2019
current time:Tue Feb 12 17:48:49 2019

開啟daemon后結(jié)果:主進程結(jié)束程序就結(jié)束了
current time:Tue Feb 12 17:49:14 2019

*
**
***
****
current time:Tue Feb 12 17:49:15 2019
*****
******
*******
********
*********

示例6:屬性:name,pid 方法:is_alive(),terminate()

name:查看進程名
pid:查看進程id
is_alive:查看進程是否正在運行
terminate:結(jié)束進程


import time
from multiprocessing import Process

def func():
    print("start")
    time.sleep(3)
    print("end")
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    time.sleep(3)
    print("進程名:%s,進程id:%s"%(p.name,p.pid))
    # is_alive方法查看進程是否正在運行
    print(p.is_alive())
    # terminate方法結(jié)束進程
    p.terminate()
    time.sleep(3)
    print(p.is_alive())
結(jié)果:
start
進程名:Process-1,進程id:17564
True
False

2、Lock

進程鎖:當(dāng)多個進程訪問共享資源時,進程鎖保證同一時間只能有一個任務(wù)可以進行修改,程序的運行方法有并發(fā)改為串行,這樣速度慢了,但是保證了數(shù)據(jù)的安全

示例:

import os
import time
import random
from multiprocessing import Process,Lock

def func(lock,n):
    lock.acquire() #加鎖
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release() #釋放
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=func,args=(lock,i))
        p.start()

3、Semaphore

信號量:Lock(鎖)可以保證同一時間只能有一個任務(wù)對共享數(shù)據(jù)進行操作,而Semaphore(信號量)可以在同一時間讓指定數(shù)量的進程操作共享數(shù)據(jù)。

示例:迷你唱吧

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

'''
迷你唱吧,20個人,同一時間只能有4個人進去
'''

def sing(i,sem):
    sem.acquire() # 加鎖
    print('%s enter the ktv'%i)
    time.sleep(random.randint(1,10))
    print('%s leave the ktv'%i)
    sem.release() # 釋放

if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(20):
        p = Process(target=sing,args=(i,sem))
        p.start()

4、Event

事件:Event是進程之間的狀態(tài)標(biāo)記通信,因為進程不共享數(shù)據(jù),所以事件對象需要以參數(shù)形式傳遞到函數(shù)中使用。

主要方法:

e = Event() # 實例化一個事件對象
e.set() # 標(biāo)記變?yōu)榉亲枞?br/>e.wait() # 默認標(biāo)記為阻塞,在等待的過程中,遇到非阻塞信號就繼續(xù)執(zhí)行
e.clear() # 標(biāo)記變?yōu)樽枞?br/>e.is_set() # 是否阻塞 True就是非阻塞,F(xiàn)alse是阻塞

示例:紅綠燈

import time
import random
from multiprocessing import Event
from multiprocessing import Process

def traffic_light(e):
    while True:
        if e.is_set(): # True為綠燈
            time.sleep(3)  # 等3秒后變?yōu)榧t燈
            print("紅燈亮")
            e.clear()
        else: # False為紅燈,等3秒后變?yōu)榫G燈
            time.sleep(3)
            print("綠燈亮")
            e.set()

def car(i,e):
    e.wait() # 默認是紅燈
    print("%s 車通過"%i)

if __name__ == '__main__':
    e = Event()

    # 控制紅綠燈的進程
    tra = Process(target=traffic_light,args=(e,))
    tra.start()

    for i in range(100):
        if i%6 == 0:
            time.sleep(random.randint(1,3))
        p = Process(target=car,args=(i,e))
        p.start()

5、Pipe

管道是進程間通信(IPC)的一種,管道是雙向通信的,但它不保證數(shù)據(jù)安全
創(chuàng)建管道:p1,p2=Pipe()

主要方法:

send():發(fā)送數(shù)據(jù)
recv():接收數(shù)據(jù)
close():關(guān)閉

示例:

def func(p):
    foo,son = p
    foo.close() # 不使用主進程的管道一端,先行關(guān)閉
    while True:
        try:
            print(son.recv())
            # 子進程在結(jié)束數(shù)據(jù)時,如果管道無數(shù)據(jù),且對端沒有close,就會報EOFError;如果管道無數(shù)據(jù),對端沒close,進程會阻塞
        except EOFError:
            break

if __name__ == '__main__':
    foo,son = Pipe()
    p = Process(target=func,args=((foo,son),))
    p.start()
    son.close() # 不使用子進程的管道一端,先行關(guān)閉
    foo.send("hello")
    foo.send("hello")
    foo.close()

6、Queue

隊列:進程之間是獨立的,要實現(xiàn)進程間通信(IPC);multiprocessing模塊支持兩種形式:隊列(queue)和管道(pipe),這兩種方式都是使用消息傳遞的,且都是雙向通信的,Queue = Pipe+Lock。

隊列的兩種創(chuàng)建方式:

q = Queue() # 創(chuàng)建隊列對象,無長度限制
q1 = Queue(3) # 傳參數(shù),創(chuàng)建一個有最大長度限制的隊列

隊列的主要方法:

q.put(1) # 放入一個數(shù)據(jù),對于無長度限制的隊列來說,永不阻塞;對于有長度限制的隊列來說,放滿就阻塞

q.get() # 隊列中有數(shù)據(jù)就取出一個數(shù)據(jù),隊列中無數(shù)據(jù)就會阻塞;遵循先進先出原則

q.qsize() # 查看隊列的數(shù)據(jù)大小,不一定準(zhǔn)確

示例1:主進程與子進程之間的通信

from multiprocessing import Process
from multiprocessing import Queue

def queue_put(q):
    q.put("123") # 子進程隊列中放入一個變量

if __name__ == '__main__':
    q = Queue()
    p = Process(target=queue_put,args=(q,))
    p.start()
    print(q.get()) # 主進程獲取到變量

示例2:子進程與子進程之間的通信
from multiprocessing import Process
from multiprocessing import Queue

def queue_put(q):
    q.put("123") # 子進程隊列中放入一個變量

def queue_get(q):
    print(q.get()) # 另一個子進程獲取到隊列中的數(shù)據(jù)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=queue_put,args=(q,))
    p.start()
    p1 = Process(target=queue_get,args=(q,))
    p1.start()

7、JoinableQueue

JoinableQueue也是multiprocessing模塊的一種隊列的實現(xiàn),但它與Queue不同的是JoinableQueue允許項目的使用者通知生成者項目已經(jīng)被成功處理。創(chuàng)建方式同Queue。
主要方法:put與get與Queue一致
? ? q.task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量,將引發(fā)ValueError異常
? ? q.join():生產(chǎn)者調(diào)用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止

示例:生產(chǎn)者消費者模型

import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process

'''
程序執(zhí)行流程
1、生產(chǎn)者生產(chǎn)的數(shù)據(jù)全部被消費 --> 2、生產(chǎn)者進程結(jié)束 --> 3、主進程代碼執(zhí)行結(jié)束 --> 4、消費者守護進程結(jié)束
'''
def producer(q,food):
    for i in range(5):
        q.put("%s -- %s"%(i,food))
        print("生產(chǎn)了 %s"%(food))
        time.sleep(random.random())
    q.join() # 2、等待消費者消費完所有數(shù)據(jù)

def consumer(q,name):
    while True:
        food = q.get()
        if food == None:break
        print("%s 吃了 %s"%(name,food))
        q.task_done() # 1、消費者每消費一個數(shù)據(jù)就返回一個task_done給生產(chǎn)者

if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,'youtiao'))
    p1.start()
    p2 = Process(target=producer,args=(q,'baozi'))
    p2.start()
    c1 = Process(target=consumer,args=(q,'daxiong'))
    c1.daemon = True # 4、消費者守護進程結(jié)束
    c1.start()
    c2 = Process(target=consumer,args=(q,'chenglei'))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer,args=(q,'niu'))
    c3.daemon = True
    c3.start()

    p1.join() # 3、等待p1執(zhí)行完畢
    p2.join() # 3、等待p2執(zhí)行完畢

8、Manager

Manager也是multiprocessing模塊的一個類,這個類主要提供了進程間通信(IPC)的一個機制,它支持Python所有的數(shù)據(jù)類型,但不提供數(shù)據(jù)安全的機制。

示例:

from multiprocessing import Manager
from multiprocessing import Process

def func(d):
    print(d)
    d['num'] -= 10

if __name__ == '__main__':

    m = Manager()
    d = m.dict({'num':100})
    l = []
    for i in range(10):
        p = Process(target=func,args=(d,))

        p.start()
        # p.join() # 同步
        l.append(p)
    for j in l:
        j.join()   # 異步

結(jié)果:
{'num': 100}
{'num': 90}
{'num': 80}
{'num': 70}
{'num': 60}
{'num': 50}
{'num': 40}
{'num': 30}
{'num': 20}
{'num': 10}

9、Pool

在執(zhí)行大量并發(fā)任務(wù)時,多進程是行之有效的手段之一,但是多進程需要注意幾個問題,一是操作系統(tǒng)不可能無限開啟進程,一般是有幾個核開啟幾個進程,二是開啟進程過多,系統(tǒng)資源占用過多,會導(dǎo)致系統(tǒng)運行速度變慢;那么遇到這種情況時pool(進程池)便是最好的解決方案。
Pool可以指定開啟一定數(shù)量的進程(一般為CPU核數(shù)+1個)等待用戶使用,當(dāng)有新的請求進入時,如果池中有空閑進程,便直接開啟;如果池中的進程都在使用,那么該請求就會等待,直到池中有進程結(jié)束,重用該進程。

示例1:多進程與進程池效率對比

import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
    i -= 1

if __name__ == '__main__':
    # 計算進程池所需事件
    start1_time = time.time()  # 開始時間
    p = Pool(5)  # 進程池中創(chuàng)建5個進程
    p.map(func,range(100)) # 調(diào)用進程執(zhí)行任務(wù),target = func args = (1,2,3...),第二個參數(shù)要是可迭代對象
    p.close() # 不允許再向進程池中添加任務(wù)
    p.join() # 等待進程池中所有進程執(zhí)行結(jié)束
    stop1_time = time.time() - start1_time # 結(jié)束時間
    print("進程池所需時間: %s "%stop1_time)

    # 計算多進程所需時間
    start2_time = time.time()  # 開始時間
    l = []
    for i in range(100):
        p1 = Process(target=func,args=(i,))
        p1.start()
        l.append(p)
    for j in l:
        j.join()
    stop2_time = time.time() - start2_time
    print("多進程所需時間: %s"%stop2_time)
結(jié)果:
進程池所需時間: 0.19990277290344238 
多進程所需時間: 1.7190303802490234

由上可知,進程池在執(zhí)行大量并發(fā)任務(wù)時的效率。

主要方法:

map(self, func, iterable, chunksize=None):將func應(yīng)用于iterable中的每個元素,收集結(jié)果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):異步提交任務(wù)的機制
apply(self, func, args=(), kwds={}):同步提交任務(wù)的機制
close():不允許再提交新的任務(wù)
join():等待進程池中的進程執(zhí)行結(jié)束在往下執(zhí)行,此方法只能在close()或teminate()之后調(diào)用

執(zhí)行apply或apply_async方法時,會返回ApplyResult類的實例對象
ApplyResult類有以下方法:
obj.get():獲取進程的返回值
obj.ready():調(diào)用完成時,返回True
obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)ValueError異常
obj.wait([timeout]):等待結(jié)果變?yōu)榭捎?/p>

示例2:apply與apply_async方法

import time
from multiprocessing import Pool

'''
apply:同步提交任務(wù)的機制
apply_async:異步提交任務(wù)機制
'''

def func(i):
    time.sleep(1)
    i += 1
    print(i)

if __name__ == '__main__':
    p = Pool(5)
    res_l = []
    for i in range(20):
        # p.apply(func,args=(i,)) # 同步,執(zhí)行完畢立即獲取到返回值
        res = p.apply_async(func,args=(i,)) # 異步,通過get獲取返回值
        res_l.append(res)
    p.close() # 不允許再提交新的任務(wù)
    p.join() # 等待進程池中的進程執(zhí)行結(jié)束在往下執(zhí)行
    for res in res_l:
        print(res.get()) # 使用get來獲取apply_aync的結(jié)果

10、pool的call函數(shù)

在進程池中,一個進程任務(wù)結(jié)束就會返回一個結(jié)果,主進程則調(diào)用一個函數(shù)去處理這個結(jié)果,這就是回調(diào)函數(shù)?;卣{(diào)函數(shù)是在主進程中完成的,不能傳參數(shù),只能接受多進程中函數(shù)的返回值;

示例:請求網(wǎng)頁

在爬蟲中,使用回調(diào)比較多,爬蟲將訪問網(wǎng)頁、下載網(wǎng)頁的過程放到子進程中去做,分析數(shù)據(jù),處理數(shù)據(jù)讓回調(diào)函數(shù)去做,因為訪問網(wǎng)頁與下載網(wǎng)頁有網(wǎng)絡(luò)延時,而處理數(shù)據(jù)只占用很小的時間


import requests
from multiprocessing import Pool
def get(url):
    ret = requests.get(url)
    return {'url':url,
            'status_code':ret.status_code,
            'content':ret.text}
def parser(dic):
    print(dic['url'],len(dic['content']))
    parse_url = "URL:%s  Size:%s"%(dic['url'],len(dic['content']))
    with open('url.txt','a') as f:
        f.write(parse_url+'\n')

if __name__ == '__main__':
    url_l = [
        'http://www.baidu.com',
        'http://www.google.com',
        'https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5',
        'https://www.youtube.com/?app=desktop',
        'https://www.facebook.com/'
    ]
    p = Pool(5)
    for i in url_l:
        p.apply_async(get,args=(i,),callback=parser)

    p.close()
    p.join()

分享標(biāo)題:Python進程之multiprocessing
網(wǎng)頁鏈接:http://weahome.cn/article/gjjccg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部