這篇文章主要講解了“Python中multiprocessing模塊的Process類分析”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Python中multiprocessing模塊的Process類分析”吧!
目前成都創(chuàng)新互聯(lián)公司已為1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、雅安服務(wù)器托管、網(wǎng)站托管運營、企業(yè)網(wǎng)站設(shè)計、咸陽網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
Python2.6版本中新添了multiprocessing模塊。它最初由Jesse Noller和Richard Oudkerk定義在PEP 371中。就像你能通過threading模塊衍生線程一樣,multiprocessing 模塊允許你衍生進(jìn)程。這里用到的思想:因為你現(xiàn)在能衍生進(jìn)程,所以你能夠避免使用全局解釋器鎖(GIL),并且充分利用機器的多個處理器。
多進(jìn)程包也包含一些根本不在threading 模塊中的API。比如:有一個靈活的Pool類能讓你在多個輸入下并行化地執(zhí)行函數(shù)。我們將在后面的小節(jié)講解Pool類。我們將以multiprocessing模塊的Process類開始講解。
開始學(xué)習(xí)multiprocessing模塊
Process這個類和threading模塊中的Thread類很像。讓我們創(chuàng)建一系列調(diào)用相同函數(shù)的進(jìn)程,并且看看它是如何工作的。
import os from multiprocessing import Process def doubler(number): """ A doubling function that can be used by a process """ result = number * 2 proc = os.getpid() print('{0} doubled to {1} by process id: {2}'.format( number, result, proc)) if __name__ == '__main__': numbers = [5, 10, 15, 20, 25] procs = [] for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() for proc in procs: proc.join()
對于上面的例子,我們導(dǎo)入Process類、創(chuàng)建一個叫doubler的函數(shù)。在函數(shù)中,我們將傳入的數(shù)字乘上2。我們也用Python的os模塊來獲取當(dāng)前進(jìn)程的ID(pid)。這個ID將告訴我們哪個進(jìn)程正在調(diào)用doubler函數(shù)。然后,在下面的代碼塊中,我們實例化了一系列的Process類并且啟動它們。***一個循環(huán)只是調(diào)用每個進(jìn)程的join()方法,該方法告訴Python等待進(jìn)程直到它結(jié)束。如果你需要結(jié)束一個進(jìn)程,你可以調(diào)用它的terminate()方法。
當(dāng)你運行上面的代碼,你應(yīng)該看到和下面類似的輸出結(jié)果:
5 doubled to 10 by process id: 10468 10 doubled to 20 by process id: 10469 15 doubled to 30 by process id: 10470 20 doubled to 40 by process id: 10471 25 doubled to 50 by process id: 10472
有時候,你***給你的進(jìn)程取一個易于理解的名字 。幸運的是,Process類確實允許你訪問同樣的進(jìn)程。讓我們來看看如下例子:
import os from multiprocessing import Process, current_process def doubler(number): """ A doubling function that can be used by a process """ result = number * 2 proc_name = current_process().name print('{0} doubled to {1} by: {2}'.format( number, result, proc_name)) if __name__ == '__main__': numbers = [5, 10, 15, 20, 25] procs = [] proc = Process(target=doubler, args=(5,)) for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() proc = Process(target=doubler, name='Test', args=(2,)) proc.start() procs.append(proc) for proc in procs: proc.join()
這一次,我們多導(dǎo)入了current_process。current_process基本上和threading模塊的current_thread是類似的東西。我們用它來獲取正在調(diào)用我們的函數(shù)的線程的名字。你將注意到我們沒有給前面的5個進(jìn)程設(shè)置名字。然后我們將第6個進(jìn)程的名字設(shè)置為“Test”。
讓我們看看我們將得到什么樣的輸出結(jié)果:
5 doubled to 10 by: Process-2 10 doubled to 20 by: Process-3 15 doubled to 30 by: Process-4 20 doubled to 40 by: Process-5 25 doubled to 50 by: Process-6 2 doubled to 4 by: Test
輸出結(jié)果說明:默認(rèn)情況下,multiprocessing模塊給每個進(jìn)程分配了一個編號,而該編號被用來組成進(jìn)程的名字的一部分。當(dāng)然,如果我們給定了名字的話,并不會有編號被添加到名字中。
鎖
multiprocessing模塊支持鎖,它和threading模塊做的方式一樣。你需要做的只是導(dǎo)入Lock,獲取它,做一些事,釋放它。
from multiprocessing import Process, Lock def printer(item, lock): """ Prints out the item that was passed in """ lock.acquire() try: print(item) finally: lock.release() if __name__ == '__main__': lock = Lock() items = ['tango', 'foxtrot', 10] for item in items: p = Process(target=printer, args=(item, lock)) p.start()
我們在這里創(chuàng)建了一個簡單的用于打印函數(shù),你輸入什么,它就輸出什么。為了避免線程之間互相阻塞,我們使用Lock對象。代碼循環(huán)列表中的三個項并為它們各自都創(chuàng)建一個進(jìn)程。每一個進(jìn)程都將調(diào)用我們的函數(shù),并且每次遍歷到的那一項作為參數(shù)傳入函數(shù)。因為我們現(xiàn)在使用了鎖,所以隊列中下一個進(jìn)程將一直阻塞,直到之前的進(jìn)程釋放鎖。
日志
為進(jìn)程創(chuàng)建日志與為線程創(chuàng)建日志有一些不同。它們存在不同是因為Python的logging包不使用共享鎖的進(jìn)程,因此有可能以來自不同進(jìn)程的信息作為結(jié)束的標(biāo)志。讓我們試著給前面的例子添加基本的日志。代碼如下:
import logging import multiprocessing from multiprocessing import Process, Lock def printer(item, lock): """ Prints out the item that was passed in """ lock.acquire() try: print(item) finally: lock.release() if __name__ == '__main__': lock = Lock() items = ['tango', 'foxtrot', 10] multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) for item in items: p = Process(target=printer, args=(item, lock)) p.start()
最簡單的添加日志的方法通過推送它到stderr實現(xiàn)。我們能通過調(diào)用thelog_to_stderr() 函數(shù)來實現(xiàn)該方法。然后我們調(diào)用get_logger 函數(shù)獲得一個logger實例,并將它的日志等級設(shè)為INFO。之后的代碼是相同的。需要提示下這里我并沒有調(diào)用join()方法。取而代之的:當(dāng)它退出,父線程將自動調(diào)用join()方法。
當(dāng)你這么做了,你應(yīng)該得到類似下面的輸出:
[INFO/Process-1] child process calling self.run() tango [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/Process-2] child process calling self.run() [INFO/MainProcess] process shutting down foxtrot [INFO/Process-2] process shutting down [INFO/Process-3] child process calling self.run() [INFO/Process-2] process exiting with exitcode 0 10 [INFO/MainProcess] calling join() for process Process-3 [INFO/Process-3] process shutting down [INFO/Process-3] process exiting with exitcode 0 [INFO/MainProcess] calling join() for process Process-2
現(xiàn)在如果你想要保存日志到硬盤中,那么這件事就顯得有些棘手。你能在Python的logging Cookbook閱讀一些有關(guān)那類話題。
Pool類
Pool類被用來代表一個工作進(jìn)程池。它有讓你將任務(wù)轉(zhuǎn)移到工作進(jìn)程的方法。讓我們看下面一個非常簡單的例子。
from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == '__main__': numbers = [5, 10, 20] pool = Pool(processes=3) print(pool.map(doubler, numbers))
基本上執(zhí)行上述代碼之后,一個Pool的實例被創(chuàng)建,并且該實例創(chuàng)建了3個工作進(jìn)程。然后我們使用map 方法將一個函數(shù)和一個可迭代對象映射到每個進(jìn)程。***我們打印出這個例子的結(jié)果:[10, 20, 40]。
你也能通過apply_async方法獲得池中進(jìn)程的運行結(jié)果:
from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == '__main__': pool = Pool(processes=3) result = pool.apply_async(doubler, (25,)) print(result.get(timeout=1))
我們上面做的事實際上就是請求進(jìn)程的運行結(jié)果。那就是get函數(shù)的用途。它嘗試去獲取我們的結(jié)果。你能夠注意到我們設(shè)置了timeout,這是為了預(yù)防我們調(diào)用的函數(shù)發(fā)生異常的情況。畢竟我們不想要它被***期地阻塞。
進(jìn)程通信
當(dāng)遇到進(jìn)程間通信的情況,multiprocessing 模塊提供了兩個主要的方法:Queues 和 Pipes。Queue 實現(xiàn)上既是線程安全的也是進(jìn)程安全的。讓我們看一個相當(dāng)簡單的并且基于 Queue的例子。代碼來自于我的文章(threading articles)。
from multiprocessing import Process, Queue sentinel = -1 def creator(data, q): """ Creates data to be consumed and waits for the consumer to finish processing """ print('Creating data and putting it on the queue') for item in data: q.put(item) def my_consumer(q): """ Consumes some data and works on it In this case, all it does is double the input """ while True: data = q.get() print('data found to be processed: {}'.format(data)) processed = data * 2 print(processed) if data is sentinel: break if __name__ == '__main__': q = Queue() data = [5, 10, 13, -1] process_one = Process(target=creator, args=(data, q)) process_two = Process(target=my_consumer, args=(q,)) process_one.start() process_two.start() q.close() q.join_thread() process_one.join() process_two.join()
在這里我們只需要導(dǎo)入Queue和Process。Queue用來創(chuàng)建數(shù)據(jù)和添加數(shù)據(jù)到隊列中,Process用來消耗數(shù)據(jù)并執(zhí)行它。通過使用Queue的put()和get()方法,我們就能添加數(shù)據(jù)到Queue、從Queue獲取數(shù)據(jù)。代碼的***一塊只是創(chuàng)建了Queue 對象以及兩個Process對象,并且運行它們。你能注意到我們在進(jìn)程對象上調(diào)用join()方法,而不是在Queue本身上調(diào)用。
感謝各位的閱讀,以上就是“Python中multiprocessing模塊的Process類分析”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Python中multiprocessing模塊的Process類分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!