將你需要多線程并發(fā)執(zhí)行的函數(shù)放入list中
龍鳳ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)建站的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書合作)期待與您的合作!
import threading
threads = []
t1 = threading.Thread(target=函數(shù)名,args=參數(shù))
threads.append(t1)
啟動(dòng)多線程
if __name__ == '__main__':
??? for t in threads:
? ? ??? t.setDaemon(True)
? ? ??? t.start()
t.join()
更多詳細(xì)操作help(threading)
#coding=utf-8
import?threading
from?time?import?ctime,sleep
#?要啟動(dòng)的函數(shù)
def?music(func):
for?i?in?range(2):
print?"I?was?listening?to?%s.?%s"?%(func,ctime())
sleep(1)
#?要啟動(dòng)的函數(shù)
def?move(func):
for?i?in?range(2):
print?"I?was?at?the?%s!?%s"?%(func,ctime())
sleep(5)
threads?=?[]
t1?=?threading.Thread(target=music,args=(u'愛(ài)情買賣',))
threads.append(t1)
t2?=?threading.Thread(target=move,args=(u'阿凡達(dá)',))
threads.append(t2)
#?函數(shù)加入線程列表
if?__name__?==?'__main__':
for?t?in?threads:
t.setDaemon(True)
t.start()
t.join()?#子線程完成運(yùn)行之前,這個(gè)子線程的父線程將一直被阻塞,不會(huì)退出
print?"all?over?%s"?%ctime()
并發(fā):邏輯上具備同時(shí)處理多個(gè)任務(wù)的能力。
并行:物理上在同一時(shí)刻執(zhí)行多個(gè)并發(fā)任務(wù)。
舉例:開個(gè)QQ,開了一個(gè)進(jìn)程,開了微信,開了一個(gè)進(jìn)程。在QQ這個(gè)進(jìn)程里面,傳輸文字開一個(gè)線程、傳輸語(yǔ)音開了一個(gè)線程、彈出對(duì)話框又開了一個(gè)線程。
總結(jié):開一個(gè)軟件,相當(dāng)于開了一個(gè)進(jìn)程。在這個(gè)軟件運(yùn)行的過(guò)程里,多個(gè)工作同時(shí)運(yùn)轉(zhuǎn),完成了QQ的運(yùn)行,那么這個(gè)多個(gè)工作分別有多個(gè)線程。
線程和進(jìn)程之間的區(qū)別:
進(jìn)程在python中的使用,對(duì)模塊threading進(jìn)行操作,調(diào)用的這個(gè)三方庫(kù)??梢酝ㄟ^(guò) help(threading) 了解其中的方法、變量使用情況。也可以使用 dir(threading) 查看目錄結(jié)構(gòu)。
current_thread_num = threading.active_count() # 返回正在運(yùn)行的線程數(shù)量
run_thread_len = len(threading.enumerate()) # 返回正在運(yùn)行的線程數(shù)量
run_thread_list = threading.enumerate() # 返回當(dāng)前運(yùn)行線程的列表
t1=threading.Thread(target=dance) #創(chuàng)建兩個(gè)子線程,參數(shù)傳遞為函數(shù)名
t1.setDaemon(True) # 設(shè)置守護(hù)進(jìn)程,守護(hù)進(jìn)程:主線程結(jié)束時(shí)自動(dòng)退出子線程。
t1.start() # 啟動(dòng)子線程
t1.join() # 等待進(jìn)程結(jié)束 exit()`# 主線程退出,t1子線程設(shè)置了守護(hù)進(jìn)程,會(huì)自動(dòng)退出。其他子線程會(huì)繼續(xù)執(zhí)行。
1、python提供兩種方式使用多線程:一個(gè)是基于函數(shù):_thread模塊或者threading模塊。一個(gè)是基于類:theading.Thread
使用多線程函數(shù)包裝線程對(duì)象:_thread
_thead.start_new_thead(func,*args,**kwargs)
args,**kwargs是被包裝函數(shù)的入?yún)?,必須傳入元祖或字?/p>
使用多線程函數(shù)包裝線程對(duì)象:threading
threading._start_new_thread(func,*args,**kwargs):開啟線程,帶元祖或字典
threading.currentThread():返回當(dāng)前線程變量
threading.enumerate():正在運(yùn)行的線程列表,不含未啟動(dòng)和已結(jié)束線程
threading.activeCount():返回正在運(yùn)行的線程數(shù)量
threading.settrace(func):為所有threading模塊啟動(dòng)的線程設(shè)置追蹤函數(shù),在調(diào)用run方法之前,func會(huì)被傳給追蹤函數(shù)
threading.setprofile(func):為所有threading模塊啟動(dòng)的線程設(shè)置性能測(cè)試函數(shù),也是在run方法調(diào)用前就傳遞給性能測(cè)試函數(shù)
使用多線程類包裝線程對(duì)象:threading.Thread
Thread類提供以下方法:
run():表示線程活動(dòng)的方法,線程需要控制些什么活動(dòng)都在這里面定義。當(dāng)線程對(duì)象一但被創(chuàng)建,其活動(dòng)一定會(huì)因調(diào)用線程的 start() 方法開始。這會(huì)在獨(dú)立的控制線程調(diào)用 run() 方法。
start():開啟線程活動(dòng)
join():等待線程中止,阻塞當(dāng)前線程直到被調(diào)用join方法的線程中止。線程A調(diào)用線程B的join方法,那線程A將會(huì)被阻塞至線程B中止。
isAlive():返回線程是否還活動(dòng)
getName():獲取線程名字
setName():設(shè)置線程名字
Lock對(duì)象:實(shí)例化線程鎖,包含acquire方法獲取鎖 和 release 方法釋放鎖,在最開始創(chuàng)建鎖的時(shí)候,鎖為未鎖定狀態(tài),調(diào)用acquire方法后鎖置為鎖定狀態(tài),此時(shí)其他線程再調(diào)用acquire方法就將會(huì)被阻塞至其他線程調(diào)用release方法釋放鎖,如果釋放一個(gè)并未被鎖定的鎖將會(huì)拋出異常。支持上下文管理協(xié)議,直接with lock 無(wú)需調(diào)用鎖定,釋放方法
Rlock對(duì)象:重入鎖,相比lock增加了線程和遞歸的概念。比如:線程目標(biāo)函數(shù)F,在獲得鎖之后執(zhí)行函數(shù)G,但函數(shù)G也需要先獲得鎖,此時(shí)同一線程,F(xiàn)獲得鎖,G等待,F(xiàn)等待G執(zhí)行,就造成了死鎖,此時(shí)使用rlock可避免。一旦線程獲得了重入鎖,同一個(gè)線程再次獲取它將不阻塞;但線程必須在每次獲取它時(shí)釋放一次。
daemon屬性:設(shè)置該線程是否是守護(hù)線程,默認(rèn)為none,需要在調(diào)用start方法之前設(shè)置好
事件對(duì)象:一個(gè)線程發(fā)出事件信號(hào) ,其他線程收到信號(hào)后作出對(duì)應(yīng)活動(dòng)。實(shí)例化事件對(duì)象后,初始事件標(biāo)志為flase。調(diào)用其wait方法將阻塞當(dāng)前所屬線程,至事件標(biāo)志為true時(shí)。調(diào)用set方法可將事件標(biāo)志置為true,被阻塞的線程將被執(zhí)行。調(diào)用clear方法可將事件標(biāo)志置為flase
注意點(diǎn):
1、繼承threading.Thread類,初始化時(shí)要記得繼承父類的__init__方法
2、run()方法只能有一個(gè)入?yún)ⅲ时M量把啟動(dòng)線程時(shí)的參數(shù)入?yún)⒌匠跏蓟臅r(shí)候
3、鎖要設(shè)定全局的,一個(gè)子線程獲得一個(gè)鎖沒(méi)有意義
以下實(shí)例:有一個(gè)列表,線程A從尾到頭遍歷元素,線程B從頭到尾將元素值重置為1,設(shè)置線程鎖之前線程A遍歷到頭部的數(shù)據(jù)已經(jīng)被修改,設(shè)置線程鎖之后不會(huì)再有數(shù)據(jù)不一致的情況
import threading,time
class tt(threading.Thread):
def __init__(self,name,func,ll):
? ? threading.Thread.__init__(self) #繼承父級(jí)的初始化方法
? ? self.name=name
? ? self.func=func? #run方法只能帶一個(gè)入?yún)ⅲ拾逊椒ㄈ雲(yún)⒌匠跏蓟臅r(shí)候
? ? self.ll=ll
def run(self):
? ? print(self.name)
? ? threadlock.acquire() #獲得鎖
? ? self.func(self.ll)
? ? threadlock.release() #釋放鎖
def readd(x):
a=len(x)
while a0:
? ? print(x[a-1])
? ? a-=1
def sett(x):
for i in range(len(x)):
? ? x[i]=1
print(x)
if __name__=="__main__":
l = [0,0,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
threadlock=threading.Lock() #實(shí)例化全局鎖
th1=tt("read",readd,l)
th2=tt("set",sett,l)
th1.start()
th2.start()
th_list=[]?
th_list.append(th1)
th_list.append(th2)
for li in th_list:
? ? li.join()? ? ? ? #主線程被阻塞,直到兩個(gè)子線程處理結(jié)束
print("主線程結(jié)束")
2、隊(duì)列
queue模塊包含queue.Queue(maxsize=0)先入先出隊(duì)列,queue.LifoQueue()先入后出隊(duì)列,和queue.PriorityQueue()優(yōu)先級(jí)可設(shè)置的隊(duì)列
Queue 模塊中的常用方法:
Queue.qsize() 返回隊(duì)列的大小,獲取的數(shù)據(jù)不可靠,因?yàn)橐恢庇芯€程在操作隊(duì)列,數(shù)據(jù)一直變化
Queue.empty() 如果隊(duì)列為空,返回True,反之False
Queue.full() 如果隊(duì)列滿了,返回True,反之False
Queue.full 與 maxsize 大小對(duì)應(yīng)
Queue.put(block=true,timeout=none) 將item數(shù)據(jù)寫入隊(duì)列,block=True,設(shè)置線程是否阻塞,設(shè)置阻塞當(dāng)隊(duì)列數(shù)據(jù)滿了之后就會(huì)阻塞,一直到隊(duì)列數(shù)據(jù)不滿時(shí)繼續(xù)添加,如果設(shè)置不阻塞,當(dāng)隊(duì)列滿了就會(huì)一直到timeout到后報(bào)錯(cuò)
Queue.get([block[, timeout]]) 取出隊(duì)列數(shù)據(jù),block=True,設(shè)置線程是否阻塞。設(shè)置阻塞,將會(huì)等待直到隊(duì)列不為空有數(shù)據(jù)可取出,設(shè)置不阻塞直到超過(guò)timeout等待時(shí)間后報(bào)錯(cuò)
Queue.task_done() 在完成一項(xiàng)工作之后,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
Queue.join() 實(shí)際上意味著等到隊(duì)列為空,再執(zhí)行別的操作。會(huì)在隊(duì)列有未完成時(shí)阻塞,等待隊(duì)列無(wú)未完成的任務(wù),取出數(shù)據(jù)get()之后還需要配置task_done使用才能讓等待隊(duì)列數(shù)-1
import queue,time
import threading
q=queue.Queue(maxsize=5)
def sett():
a=0
while a20:
? ? q.put(a,True)
? ? print("%d被put"%a)
? ? a+=1
def gett():
time.sleep(1)
while not q.empty(): #只要隊(duì)列沒(méi)空,一直取數(shù)據(jù)
? ? print("%d被取出"%q.get(True))
? ? q.task_done() #取出一次數(shù)據(jù),將未完成任務(wù)-1,不然使用join方法線程會(huì)一直阻塞
if __name__=="__main__":
th1=threading._start_new_thread(sett,()) #不帶參數(shù)也要傳入空元祖不然會(huì)報(bào)錯(cuò)
th2=threading._start_new_thread(gett,())
time.sleep(1) #延時(shí)主線程1S,等待put線程已經(jīng)put部分?jǐn)?shù)據(jù)到隊(duì)列
q.join()#阻塞主線程,直到未完成任務(wù)為0
在實(shí)際處理數(shù)據(jù)時(shí),因系統(tǒng)內(nèi)存有限,我們不可能一次把所有數(shù)據(jù)都導(dǎo)出進(jìn)行操作,所以需要批量導(dǎo)出依次操作。為了加快運(yùn)行,我們會(huì)采用多線程的方法進(jìn)行數(shù)據(jù)處理, 以下為我總結(jié)的多線程批量處理數(shù)據(jù)的模板:
主要分為三大部分:
共分4部分對(duì)多線程的內(nèi)容進(jìn)行總結(jié)。
先為大家介紹線程的相關(guān)概念:
在飛車程序中,如果沒(méi)有多線程,我們就不能一邊聽(tīng)歌一邊玩飛車,聽(tīng)歌與玩 游戲 不能并行;在使用多線程后,我們就可以在玩 游戲 的同時(shí)聽(tīng)背景音樂(lè)。在這個(gè)例子中啟動(dòng)飛車程序就是一個(gè)進(jìn)程,玩 游戲 和聽(tīng)音樂(lè)是兩個(gè)線程。
Python 提供了 threading 模塊來(lái)實(shí)現(xiàn)多線程:
因?yàn)樾陆ň€程系統(tǒng)需要分配資源、終止線程系統(tǒng)需要回收資源,所以如果可以重用線程,則可以減去新建/終止的開銷以提升性能。同時(shí),使用線程池的語(yǔ)法比自己新建線程執(zhí)行線程更加簡(jiǎn)潔。
Python 為我們提供了 ThreadPoolExecutor 來(lái)實(shí)現(xiàn)線程池,此線程池默認(rèn)子線程守護(hù)。它的適應(yīng)場(chǎng)景為突發(fā)性大量請(qǐng)求或需要大量線程完成任務(wù),但實(shí)際任務(wù)處理時(shí)間較短。
其中 max_workers 為線程池中的線程個(gè)數(shù),常用的遍歷方法有 map 和 submit+as_completed 。根據(jù)業(yè)務(wù)場(chǎng)景的不同,若我們需要輸出結(jié)果按遍歷順序返回,我們就用 map 方法,若想誰(shuí)先完成就返回誰(shuí),我們就用 submit+as_complete 方法。
我們把一個(gè)時(shí)間段內(nèi)只允許一個(gè)線程使用的資源稱為臨界資源,對(duì)臨界資源的訪問(wèn),必須互斥的進(jìn)行。互斥,也稱間接制約關(guān)系。線程互斥指當(dāng)一個(gè)線程訪問(wèn)某臨界資源時(shí),另一個(gè)想要訪問(wèn)該臨界資源的線程必須等待。當(dāng)前訪問(wèn)臨界資源的線程訪問(wèn)結(jié)束,釋放該資源之后,另一個(gè)線程才能去訪問(wèn)臨界資源。鎖的功能就是實(shí)現(xiàn)線程互斥。
我把線程互斥比作廁所包間上大號(hào)的過(guò)程,因?yàn)榘g里只有一個(gè)坑,所以只允許一個(gè)人進(jìn)行大號(hào)。當(dāng)?shù)谝粋€(gè)人要上廁所時(shí),會(huì)將門上上鎖,這時(shí)如果第二個(gè)人也想大號(hào),那就必須等第一個(gè)人上完,將鎖解開后才能進(jìn)行,在這期間第二個(gè)人就只能在門外等著。這個(gè)過(guò)程與代碼中使用鎖的原理如出一轍,這里的坑就是臨界資源。 Python 的 threading 模塊引入了鎖。 threading 模塊提供了 Lock 類,它有如下方法加鎖和釋放鎖:
我們會(huì)發(fā)現(xiàn)這個(gè)程序只會(huì)打印“第一道鎖”,而且程序既沒(méi)有終止,也沒(méi)有繼續(xù)運(yùn)行。這是因?yàn)? Lock 鎖在同一線程內(nèi)第一次加鎖之后還沒(méi)有釋放時(shí),就進(jìn)行了第二次 acquire 請(qǐng)求,導(dǎo)致無(wú)法執(zhí)行 release ,所以鎖永遠(yuǎn)無(wú)法釋放,這就是死鎖。如果我們使用 RLock 就能正常運(yùn)行,不會(huì)發(fā)生死鎖的狀態(tài)。
在主線程中定義 Lock 鎖,然后上鎖,再創(chuàng)建一個(gè)子 線程t 運(yùn)行 main 函數(shù)釋放鎖,結(jié)果正常輸出,說(shuō)明主線程上的鎖,可由子線程解鎖。
如果把上面的鎖改為 RLock 則報(bào)錯(cuò)。在實(shí)際中設(shè)計(jì)程序時(shí),我們會(huì)將每個(gè)功能分別封裝成一個(gè)函數(shù),每個(gè)函數(shù)中都可能會(huì)有臨界區(qū)域,所以就需要用到 RLock 。
一句話總結(jié)就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他線程中的鎖進(jìn)行操作, RLock 只能由本線程進(jìn)行操作。