這篇文章主要介紹“Python中怎么使用multiprocessing實(shí)現(xiàn)進(jìn)程間通信”的相關(guān)知識,小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“Python中怎么使用multiprocessing實(shí)現(xiàn)進(jìn)程間通信”文章能幫助大家解決問題。
創(chuàng)新互聯(lián)建站為您提適合企業(yè)的網(wǎng)站設(shè)計(jì)?讓您的網(wǎng)站在搜索引擎具有高度排名,讓您的網(wǎng)站具備超強(qiáng)的網(wǎng)絡(luò)競爭力!結(jié)合企業(yè)自身,進(jìn)行網(wǎng)站設(shè)計(jì)及把握,最后結(jié)合企業(yè)文化和具體宗旨等,才能創(chuàng)作出一份性化解決方案。從網(wǎng)站策劃到成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作, 我們的網(wǎng)頁設(shè)計(jì)師為您提供的解決方案。
python的多線程代碼效率由于受制于GIL,不能利用多核CPU來加速,而多進(jìn)程方式可以繞過GIL, 發(fā)揮多CPU加速的優(yōu)勢,能夠明顯提高程序的性能
但進(jìn)程間通信卻是不得不考慮的問題。 進(jìn)程不同于線程,進(jìn)程有自己的獨(dú)立內(nèi)存空間,不能使用全局變量在進(jìn)程間傳遞數(shù)據(jù)。
實(shí)際項(xiàng)目需求中,常常存在密集計(jì)算、或?qū)崟r(shí)性任務(wù),進(jìn)程之間有時(shí)需要傳遞大量數(shù)據(jù),如圖片、大對象等,傳遞數(shù)據(jù)如果通過文件序列化、或網(wǎng)絡(luò)接口來進(jìn)行,難以滿足實(shí)時(shí)性要求,采用redis,或者kaffka, rabbitMQ 之第3方消息隊(duì)列包,又使系統(tǒng)復(fù)雜化了。
Python multiprocessing 模塊本身就提供了消息機(jī)制、同步機(jī)制、共享內(nèi)存等各種非常高效的進(jìn)程間通信方式。
了解并掌握 python 進(jìn)程間通信的各類方式的使用,以及安全機(jī)制,可以幫助大幅提升程序運(yùn)行性能。
進(jìn)程間通信的主要方式總結(jié)如下
關(guān)于進(jìn)程間通信的內(nèi)存安全
內(nèi)存安全意味著,多進(jìn)程間可能會因同搶,意外銷毀等原因造成共享變量異常。
Multiprocessing 模塊提供的Queue, Pipe, Lock, Event 對象,都已實(shí)現(xiàn)了進(jìn)程間通信安全機(jī)制。
采用共享內(nèi)存方式通信,需要在代碼中自已來跟蹤、銷毀這些共享內(nèi)存變量,否則可能會出同搶、未正常銷毀等。造成系統(tǒng)異常。 除非開發(fā)者很清楚共享內(nèi)存使用特點(diǎn),否則不建議直接使用此共享內(nèi)存,而是通過Manager管理器來使用共享內(nèi)存。
內(nèi)存管理器Manager
Multiprocessing提供了內(nèi)存管理器Manager類,可統(tǒng)一解決進(jìn)程通信的內(nèi)存安全問題,可以將各種共享數(shù)據(jù)加入管理器,包括 list, dict, Queue, Lock, Event, Shared Memory 等,由其統(tǒng)一跟蹤與銷毀。
類似于1上簡單的socket通道,雙端均可收發(fā)消息。
Pipe 對象的構(gòu)建方法:
parent_conn, child_conn = Pipe(duplex=True/False)
參數(shù)說明
duplex=True, 管道為雙向通信
duplex=False, 管道為單向通信,只有child_conn可以發(fā)消息,parent_conn只能接收。
示例代碼:
from multiprocessing import Process, Pipe def myfunction(conn): conn.send(['hi!! I am Python']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=myfunction, args=(child_conn,)) p.start() print (parent_conn.recv() ) p.join()
Multiprocessing 的Queue 類,是在python queue 3.0版本上修改的, 可以很容易實(shí)現(xiàn)生產(chǎn)者 – 消息者間傳遞數(shù)據(jù),而且Multiprocessing的Queue 模塊實(shí)現(xiàn)了lock安全機(jī)制。
Queue模塊共提供了3種類型的隊(duì)列。
(1) FIFO queue , 先進(jìn)先出,
class queue.Queue(maxsize=0)
(2) LIFO queue, 后進(jìn)先出, 實(shí)際上就是堆棧
class queue.LifoQueue(maxsize=0)
(3) 帶優(yōu)先級隊(duì)列, 優(yōu)先級最低entry value lowest 先了列
class queue.PriorityQueue(maxsize=0)
Multiprocessing.Queue類的主要方法:
method | Description |
---|---|
queue.qsize() | 返回隊(duì)列長度 |
queue.full() | 隊(duì)列滿,返回 True, 否則返回False |
queue.empty() | 隊(duì)列空,返回 True, 否則返回False |
queue.put(item) | 將數(shù)據(jù)寫入隊(duì)列 |
queue.get() | 將數(shù)據(jù)拋出隊(duì)列 , |
queue.put_nowait(item), queue.get_nowait() | 無等待寫入或拋出 |
說明:
put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。
Multiprocessing 的Queue類沒有提供Task_done, join方法
Queue模塊的其它隊(duì)列類:
(1) SimpleQueue
簡潔版的FIFO隊(duì)列, 適事簡單場景使用
(2) JoinableQueue子類
Python 3.5 后新增的 Queue的子類,擁有 task_done(), join() 方法
task_done()表示,最近讀出的1個(gè)任務(wù)已經(jīng)完成。
join()阻塞隊(duì)列,直到queue中的所有任務(wù)都已完成。
producer – consumer 場景,使用Queue的示例
import multiprocessing def producer(numbers, q): for x in numbers: if x % 2 == 0: if q.full(): print("queue is full") break q.put(x) print(f"put {x} in queue by producer") return None def consumer(q): while not q.empty(): print(f"take data {q.get()} from queue by consumer") return None if __name__ == "__main__": # 設(shè)置1個(gè)queue對象,最大長度為5 qu = multiprocessing.Queue(maxsize=5,) # 創(chuàng)建producer子進(jìn)程,把queue做為其中1個(gè)參數(shù)傳給它,該進(jìn)程負(fù)責(zé)寫 p5 = multiprocessing.Process( name="producer-1", target=producer, args=([random.randint(1, 100) for i in range(0, 10)], qu) ) p5.start() p5.join() #創(chuàng)建consumer子進(jìn)程,把queue做為1個(gè)參數(shù)傳給它,該進(jìn)程中隊(duì)列中讀 p6 = multiprocessing.Process( name="consumer-1", target=consumer, args=(qu,) ) p6.start() p6.join() print(qu.qsize())
Multiprocessing也提供了與threading 類似的同步鎖機(jī)制,確保某個(gè)時(shí)刻只有1個(gè)子進(jìn)程可以訪問某個(gè)資源或執(zhí)行某項(xiàng)任務(wù), 以避免同搶。
例如:多個(gè)子進(jìn)程同時(shí)訪問數(shù)據(jù)庫表時(shí),如果沒有同步鎖,用戶A修改1條數(shù)據(jù)后,還未提交,此時(shí),用戶B也進(jìn)行了修改,可以預(yù)見,用戶A提交的將是B個(gè)修改的數(shù)據(jù)。
添加了同步鎖,可以確保同時(shí)只有1個(gè)子進(jìn)程能夠進(jìn)行寫入數(shù)據(jù)庫與提交操作。
如下面的示例,同時(shí)只有1個(gè)進(jìn)程可以執(zhí)行打印操作。
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
Event 機(jī)制的工作原理:
1個(gè)event 對象實(shí)例管理著1個(gè) flag標(biāo)記, 可以用set()方法將其置為true, 用clear()方法將其置為false, 使用wait()將阻塞當(dāng)前子進(jìn)程,直至flag被置為true.
這樣由1個(gè)進(jìn)程通過event flag 就可以控制、協(xié)調(diào)各子進(jìn)程運(yùn)行。
Event object的使用方法:
1)主函數(shù): 創(chuàng)建1個(gè)event 對象, flag = multiprocessing.Event() , 做為參數(shù)傳給各子進(jìn)程
2) 子進(jìn)程A: 不受event影響,通過event 控制其它進(jìn)程的運(yùn)行
o 先clear(),將event 置為False, 占用運(yùn)行權(quán).
o 完成工作后,用set()把flag置為True。
3) 子進(jìn)程B, C: 受event 影響
o 設(shè)置 wait() 狀態(tài),暫停運(yùn)行
o 直到flag重新變?yōu)門rue,恢復(fù)運(yùn)行
主要方法:
set(), clear()設(shè)置 True/False,
wait() 使進(jìn)程等待,直到flag被改為true.
is_set(), Return True if and only if the internal flag is true.
驗(yàn)證進(jìn)程間通信 – Event
import multiprocessing import time import random def joo_a(q, ev): print("subprocess joo_a start") if not ev.is_set(): ev.wait() q.put(random.randint(1, 100)) print("subprocess joo_a ended") def joo_b(q, ev): print("subprocess joo_b start") ev.clear() time.sleep(2) q.put(random.randint(200, 300)) ev.set() print("subprocess joo_b ended") def main_event(): qu = multiprocessing.Queue() ev = multiprocessing.Event() sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev)) sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,)) sub_a.start() sub_b.start() # ev.set() sub_a.join() sub_b.join() while not qu.empty(): print(qu.get()) if __name__ == "__main__": main_event()
子進(jìn)程之間共存內(nèi)存變量,要用 multiprocessing.Value(), Array() 來定義變量。 實(shí)際上是ctypes 類型,由multiprocessing.sharedctypes模塊提供相關(guān)功能
注意 使用 share memory 要考慮同搶等問題,釋放等問題,需要手工實(shí)現(xiàn)。因此在使用共享變量時(shí),建議使用Manager管程來管理這些共享變量。
def func(num): num.value=10.78 #子進(jìn)程改變數(shù)值的值,主進(jìn)程跟著改變 if __name__=="__main__": num = multiprocessing.Value("d", 10.0) # d表示數(shù)值,主進(jìn)程與子進(jìn)程可共享這個(gè)變量。 p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num.value)
進(jìn)程之間共享數(shù)據(jù)(數(shù)組型):
import multiprocessing def func(num): num[2]=9999 #子進(jìn)程改變數(shù)組,主進(jìn)程跟著改變 if __name__=="__main__": num=multiprocessing.Array("i",[1,2,3,4,5]) p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num[:])
如果進(jìn)程間需要共享對象數(shù)據(jù),或共享內(nèi)容,數(shù)據(jù)較大,multiprocessing 提供了SharedMemory類來實(shí)現(xiàn)進(jìn)程間實(shí)時(shí)通信,不需要通過發(fā)消息,讀寫磁盤文件來實(shí)現(xiàn),速度更快。
注意:直接使用SharedMemory 存在著同搶、泄露隱患,應(yīng)通過SharedMemory Manager 管程類來使用, 以確保內(nèi)存安全。
創(chuàng)建共享內(nèi)存區(qū):
multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)
方法:
父進(jìn)程創(chuàng)建shared_memory 后,子進(jìn)程可以使用它,當(dāng)不再需要后,使用close(), 刪除使用unlink()方法
相關(guān)屬性:
獲取內(nèi)存區(qū)內(nèi)容: shm.buf
獲取內(nèi)存區(qū)名稱: shm.name
獲取內(nèi)存區(qū)字節(jié)數(shù): shm.size
示例:
>>> from multiprocessing import shared_memory >>> shm_a = shared_memory.SharedMemory(create=True, size=10) >>> type(shm_a.buf)>>> buffer = shm_a.buf >>> len(buffer) 10 >>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once >>> buffer[4] = 100 # Modify single byte at a time >>> # Attach to an existing shared memory block >>> shm_b = shared_memory.SharedMemory(shm_a.name) >>> import array >>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array array('b', [22, 33, 44, 55, 100]) >>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes >>> bytes(shm_a.buf[:5]) # Access via shm_a b'howdy' >>> shm_b.close() # Close each SharedMemory instance >>> shm_a.close() >>> shm_a.unlink() # Call unlink only once to release the shared memory
sharedMemory類還提供了1個(gè)共享列表類型,這樣就更方便了,進(jìn)程間可以直接共享python強(qiáng)大的列表
構(gòu)建方法:
multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)
from multiprocessing import shared_memory >>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42]) >>> [ type(entry) for entry in a ] [, , , , , , ] >>> a[2] -273.154 >>> a[2] = -78.5 >>> a[2] -78.5 >>> a[2] = 'dry ice' # Changing data types is supported as well >>> a[2] 'dry ice' >>> a[2] = 'larger than previously allocated storage space' Traceback (most recent call last): ... ValueError: exceeds available storage for existing str >>> a[2] 'dry ice' >>> len(a) 7 >>> a.index(42) 6 >>> a.count(b'howdy') 0 >>> a.count(b'HoWdY') 1 >>> a.shm.close() >>> a.shm.unlink() >>> del a # Use of a ShareableList after call to unlink() is unsupported b = shared_memory.ShareableList(range(5)) # In a first process >>> c = shared_memory.ShareableList(name=b.shm.name) # In a second process >>> c ShareableList([0, 1, 2, 3, 4], name='...') >>> c[-1] = -999 >>> b[-1] -999 >>> b.shm.close() >>> c.shm.close() >>> c.shm.unlink()
Multiprocessing 提供了 Manager 內(nèi)存管理器類,當(dāng)調(diào)用1個(gè)Manager實(shí)例對象的start()方法時(shí),會創(chuàng)建1個(gè)manager進(jìn)程,其唯一目的就是管理共享內(nèi)存, 避免出現(xiàn)進(jìn)程間共享數(shù)據(jù)不同步,內(nèi)存泄露等現(xiàn)象。
其原理如下:
Manager管理器相當(dāng)于提供了1個(gè)共享內(nèi)存的服務(wù),不僅可以被主進(jìn)程創(chuàng)建的多個(gè)子進(jìn)程使用,還可以被其它進(jìn)程訪問,甚至跨網(wǎng)絡(luò)訪問。本文僅聚焦于由單一主進(jìn)程創(chuàng)建的各進(jìn)程之間的通信。
相關(guān)類:multiprocessing.Manager
子類有:
multiprocessing.managers.SharedMemoryManager
multiprocessing.managers.BaseManager
支持共享變量類型:
python基本類型 int, str, list, tuple, list
進(jìn)程通信對象: Queue, Lock, Event,
Condition, Semaphore, Barrier ctypes類型: Value, Array
1)創(chuàng)建管理器對象
snm = Manager() snm = SharedMemoryManager()
2)創(chuàng)建共享內(nèi)存變量
新建list, dict
sl = snm.list(), snm.dict()
新建1塊bytes共享內(nèi)存變量,需要指定大小
sx = snm.SharedMemory(size)
新建1個(gè)共享列表變量,可用列表來初始化
sl = snm.ShareableList(sequence) 如 sl = smm.ShareableList([‘howdy', b'HoWdY', -273.154, 100, True])
新建1個(gè)queue, 使用multiprocessing 的Queue類型
snm = Manager() q = snm.Queue()
示例 :
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
將打印
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
方法一:
調(diào)用snm.shutdown()方法,會自動調(diào)用每個(gè)內(nèi)存塊的unlink()方法釋放內(nèi)存?;蛘?snm.close()
方法二:
使用with語句,結(jié)束后會自動釋放所有manager變量
>>> with SharedMemoryManager() as smm: ... sl = smm.ShareableList(range(2000)) ... # Divide the work among two processes, storing partial results in sl ... p1 = Process(target=do_work, args=(sl, 0, 1000)) ... p2 = Process(target=do_work, args=(sl, 1000, 2000)) ... p1.start() ... p2.start() # A multiprocessing.Pool might be more efficient ... p1.join() ... p2.join() # Wait for all work to complete in both processes ... total_result = sum(sl) # Consolidate the partial results now in sl
managers的子類BaseManager提供register()方法,支持注冊自定義數(shù)據(jù)類型。如下例,注冊1個(gè)自定義MathsClass類,并生成實(shí)例。
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8))
關(guān)于“Python中怎么使用multiprocessing實(shí)現(xiàn)進(jìn)程間通信”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點(diǎn)。