?
專注于為中小企業(yè)提供成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)玉溪免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千余家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
?
目錄
threading.Semaphore類:...1
數(shù)據(jù)結(jié)構(gòu)Queue:...5
GIL.5
?
?
?
線程同步技術(shù);
和Lock很像,信號量內(nèi)部維護(hù)一個倒計(jì)數(shù)器,每一次acquire都會減1(release加1),當(dāng)acquire方法發(fā)現(xiàn)計(jì)數(shù)為0就阻塞,直至其它線程對信號量release后,計(jì)數(shù)大于0,恢復(fù)阻塞的線程;
?
倒計(jì)數(shù)器永遠(yuǎn)不會<0,是非負(fù)數(shù),因?yàn)閍cquire時(shí)發(fā)現(xiàn)是0都會被阻塞;
與linux OS的信號完全不同;
?
s=threading.Semaphore(value=1),構(gòu)造方法,value<0拋ValueError異常;
s.acquire(blocking=True,timeout=None),獲取信號量,計(jì)數(shù)器減1,獲取成功返回True;
s.release(),釋放信號量,計(jì)數(shù)器加1;
?
應(yīng)用場景:
連接池;
因?yàn)橘Y源有限,每開啟一個,連接成本很高(TCP三次握手四次揮手等),所以使用連接池;
?
threading.BoundedSemaphore類:
有界的信號量,作上界控制;
不允許使用release超出初始值的范圍,否則拋ValueError異常;
?
信號量和鎖:
鎖,只允許同一個時(shí)間一個線程獨(dú)占資源,它是特殊的信號量,即信號量計(jì)數(shù)器初值為1;
信號量,允許多個線程訪問共享資源,但這個共享資源數(shù)量有限;
鎖,可看作是特殊的信號量;
?
例:
def work(s:threading.Semaphore):
??? logging.info('in sub')
??? s.acquire()
??? logging.info('end sub')
?
s = threading.Semaphore(3)
?
logging.info(s.acquire())
logging.info(s.acquire())
logging.info(s.acquire())
?
threading.Thread(target=work, args=(s,)).start()
?
print('~'*20)
time.sleep(2)
logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3))
s.release()
print('end main')
輸出:
2018-08-07-10:07:23?????? Thread info: 9360 MainThread True
2018-08-07-10:07:23?????? Thread info: 9360 MainThread True
2018-08-07-10:07:23?????? Thread info: 9360 MainThread True
2018-08-07-10:07:23?????? Thread info: 13180 Thread-1 in sub
~~~~~~~~~~~~~~~~~~~~
2018-08-07-10:07:25?????? Thread info: 9360 MainThread False
2018-08-07-10:07:28?????? Thread info: 9360 MainThread False
2018-08-07-10:07:28?????? Thread info: 13180 Thread-1 end sub
end main
?
例:
s = threading.Semaphore(3)
s.release()?? #直接使用release改變了初始預(yù)設(shè)值
s.release()
s.release()
print(s.__dict__)
輸出:
{'_value': 6, '_cond':
?
例,解決超出預(yù)設(shè)值:
s = threading.BoundedSemaphore(3)
s.release()
輸出:
Traceback (most recent call last):
? File "E:/git_practice/cmdb/example_threading2.py", line 359, in
??? s.release()
? File "D:\Python\Python35\lib\threading.py", line 480, in release
??? raise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times
?
例:
連接池應(yīng)有容量(總數(shù)),有一個工廠方法可獲取連接,能夠把不用的連接返回,供其它調(diào)用者使用;
?
class Conn:
??? def __init__(self, name):
??????? self.name = name
?
class Pool:
??? def __init__(self, count):
??????? self.count = count
??????? # self.pool = []
??????? self.pool = [self._connect('conn-{}'.format(i)) for i in range(count)]
?
??? def _connect(self, conn_name):
??????? return Conn(conn_name)
?
??? def get_conn(self):?? #此方法在多線程時(shí)有安全問題;如果池中正好有一個連接,有可能多個線程判斷池的長度是>0的,當(dāng)一個線程拿走了連接對象,其它線程再來pop就拋異常,解決:用Lock或Semaphore
??????? # return self.pool.pop()
???? ???if len(self.pool) > 0:
??????????? return self.pool.pop()
?
??? def return_conn(self, conn:Conn):
??????? self.pool.append(conn)
?
例,連接池改進(jìn):
class Conn:
??? def __init__(self, name):
??????? self.name = name
?
class Pool:
??? def __init__(self, count):
??????? self.count = count
??????? self.pool = [self._connect('conn-{}'.format(i)) for i in range(count)]
??????? # self.sema = threading.Semaphore(count)
??????? self.sema = threading.BoundedSemaphore(count)?? #信號量比計(jì)算列表長度要好
?
??? def _connect(self, conn_name):
??????? return Conn(conn_name)
?
??? def get_conn(self):
??????? self.sema.acquire()
??????? data = self.pool.pop()
??????? return data
?
??? def return_conn(self, conn:Conn):
??????? self.pool.append(conn)
??????? self.sema.release()
?
pool = Pool(3)
def work(pool:Pool):
??? conn = pool.get_conn()
??? logging.info(conn)
??? threading.Event().wait(random.randint(1,4))
??? pool.return_conn(conn)
?
for i in range(6):
??? threading.Thread(target=work, args=(pool,), name='worker-{}'.format(i)).start()
輸出:
2018-08-07-14:09:11?????? Thread info: 3264 worker-0 <__main__.Conn object at 0x00000000014E6780>
2018-08-07-14:09:11?????? Thread info: 12452 worker-1 <__main__.Conn object at 0x00000000014E6630>
2018-08-07-14:09:11?????? Thread info: 11468 worker-2 <__main__.Conn object at 0x00000000014E6748>
2018-08-07-14:09:13?????? Thread info: 13228 worker-3 <__main__.Conn object at 0x00000000014E6748>
2018-08-07-14:09:14?????? Thread info: 12692 worker-4 <__main__.Conn object at 0x00000000014E6780>
2018-08-07-14:09:15?????? Thread info: 6748 worker-5 <__main__.Conn object at 0x00000000014E6630>
?
注:
使用信號量解決資源有限問題;
如果池中有資源,請求者獲取資源時(shí)信號量減1,拿走資源;
當(dāng)請求超過資源數(shù),請求者只能等待;
當(dāng)使用者用完歸還資源后信號量加1,等到線程拿到就可喚醒拿走資源;
?
以上,從程序邏輯上分析:
1、如果還沒有使用信號量就release,會怎么樣?
超出預(yù)設(shè)值,解決用threading.BoundedSemaphore;
2、如果使用了信號量,但還沒用完?
??????? self.pool.append(conn)
??????? self.sema.release()
如果一種極端情況,計(jì)數(shù)器還差1個就滿了,有三個線程A、B、C都執(zhí)行了第一句,都沒來得及release,這時(shí)輪到線程A release,然后輪到C release,這時(shí)一定出問題,超界了ValueError,因此有界信號量能保證,一定不能多歸還;
3、很多線程用完了信號量?
沒有獲得信號量的線程都阻塞,沒有線程和歸還的線程爭搶,當(dāng)append后才release,這時(shí)才能等待的線程被喚醒,才能pop,即沒有獲取信號量就不能pop,這是安全的;
?
?
?
標(biāo)準(zhǔn)庫;
提供FIFO和LIFO的Queue,優(yōu)先隊(duì)列;
Queue類是線程安全的,適用于多線程間安全的交換數(shù)據(jù),內(nèi)部使用了Lock和Condition;
?
魔術(shù)方法中,說實(shí)現(xiàn)容器的大小不準(zhǔn)確?
如果不加鎖,是不可能獲得準(zhǔn)確的大小的,因?yàn)楫?dāng)前線程剛讀取了一個大小,還沒取走,就有可能被其它線程改掉了;
?
Queue類的size雖加了鎖,但依然不能保證立即get、put就能成功,因?yàn)樽x取大小和get、put方法是分開的;
?
?
?
gobal intepreter lock,全局解釋器鎖;
GIL保證cpyton進(jìn)程中,只有一個線程執(zhí)行字節(jié)碼,甚至在多核cpu情況下也是如此;ruby中也有GIL;另,其它解釋器沒有這種情況,如pypy、jython等;
?
cpython中沒有真正的多線程;
GIL的本質(zhì):理解cpython多線程;
?
cpython中:
IO密集型,由于線程阻塞,就會調(diào)度其它線程;
cpu密集型,當(dāng)前線程可能會連續(xù)的獲得GIL,導(dǎo)致其它線程幾乎無法使用cpu;
?
IO密集型,使用多線程;
cpu密集型,使用多進(jìn)程,繞開GIL;
?
新版cpython正努力優(yōu)化GIL問題,但不是移除;
如果非要使用多線程且要有效率,請繞行,選擇其它語言go、erlang等;
?
py中絕大多數(shù)內(nèi)置數(shù)據(jù)結(jié)構(gòu)都是原子操作,如lst.append()、lst.pop();
由于GIL的存在,py的內(nèi)置數(shù)據(jù)類型在多線程編程時(shí)就變?yōu)榘踩牧?,但?shí)際上它們本身不是線程安全的;
?
項(xiàng)目一開始就要固定使用某一解釋器的XX版本;
?
保留GIL的原因:
guido堅(jiān)持的簡單哲學(xué),對于初學(xué)者門檻低,不需要高深的系統(tǒng)知識也能安全、簡單的使用py;
若移除GIL,會降低cpython單線程的執(zhí)行效率;
?
例1:
def calc():
??? sum = 0
??? for _ in range(100000000):
??????? sum += 1
?
start = datetime.datetime.now()
?
calc()
calc()
calc()
calc()
calc()
?
delta = (datetime.datetime.now() - start).total_seconds()
print(delta)
輸出:
32.942885
?
例2:
def calc():
??? sum = 0
??? for _ in range(100000000):
??????? sum += 1
?
start = datetime.datetime.now()
?
lst = []
for _ in range(5):?? #CPU密集型,用多進(jìn)程解決
??? t = threading.Thread(target=calc)
??? t.start()
??? # t.join()?? #t.join()不能放在此處,若放在此處是串行(第1個線程執(zhí)行完才能循環(huán)執(zhí)行到下一個線程),不是并行
??? lst.append(t)
for t in lst:
??? t.join()
?
delta = (datetime.datetime.now() - start).total_seconds()
print(delta)
輸出:
32.487859
?
對比例1和例2:
從結(jié)果上看,單線程和多線程一樣,cpython的多線程沒有優(yōu)勢和一個線程執(zhí)行時(shí)間相當(dāng),因?yàn)镚IL的存在;
?
?