深入解析Python中的線程同步方法
創(chuàng)新互聯(lián)是一家網(wǎng)站設(shè)計公司,集創(chuàng)意、互聯(lián)網(wǎng)應(yīng)用、軟件技術(shù)為一體的創(chuàng)意網(wǎng)站建設(shè)服務(wù)商,主營產(chǎn)品:響應(yīng)式網(wǎng)站設(shè)計、成都品牌網(wǎng)站建設(shè)、成都全網(wǎng)營銷推廣。我們專注企業(yè)品牌在網(wǎng)站中的整體樹立,網(wǎng)絡(luò)互動的體驗,以及在手機等移動端的優(yōu)質(zhì)呈現(xiàn)。成都網(wǎng)站建設(shè)、做網(wǎng)站、移動互聯(lián)產(chǎn)品、網(wǎng)絡(luò)運營、VI設(shè)計、云產(chǎn)品.運維為核心業(yè)務(wù)。為用戶提供一站式解決方案,我們深知市場的競爭激烈,認真對待每位客戶,為客戶提供賞析悅目的作品,網(wǎng)站的價值服務(wù)。
同步訪問共享資源
在使用線程的時候,一個很重要的問題是要避免多個線程對同一變量或其它資源的訪問沖突。一旦你稍不留神,重疊訪問、在多個線程中修改(共享資源)等這些操作會導致各種各樣的問題;更嚴重的是,這些問題一般只會在比較極端(比如高并發(fā)、生產(chǎn)服務(wù)器、甚至在性能更好的硬件設(shè)備上)的情況下才會出現(xiàn)。
比如有這樣一個情況:需要追蹤對一事件處理的次數(shù)
counter = 0
def process_item(item):
global counter
... do something with item ...
counter += 1
如果你在多個線程中同時調(diào)用這個函數(shù),你會發(fā)現(xiàn)counter的值不是那么準確。在大多數(shù)情況下它是對的,但有時它會比實際的少幾個。
出現(xiàn)這種情況的原因是,計數(shù)增加操作實際上分三步執(zhí)行:
解釋器獲取counter的當前值計算新值將計算的新值回寫counter變量
考慮一下這種情況:在當前線程獲取到counter值后,另一個線程搶占到了CPU,然后同樣也獲取到了counter值,并進一步將counter值重新計算并完成回寫;之后時間片重新輪到當前線程(這里僅作標識區(qū)分,并非實際當前),此時當前線程獲取到counter值還是原來的,完成后續(xù)兩步操作后counter的值實際只加上1。
另一種常見情況是訪問不完整或不一致狀態(tài)。這類情況主要發(fā)生在一個線程正在初始化或更新數(shù)據(jù)時,另一個進程卻嘗試讀取正在更改的數(shù)據(jù)。
原子操作
實現(xiàn)對共享變量或其它資源的同步訪問最簡單的方法是依靠解釋器的原子操作。原子操作是在一步完成執(zhí)行的操作,在這一步中其它線程無法獲得該共享資源。
通常情況下,這種同步方法只對那些只由單個核心數(shù)據(jù)類型組成的共享資源有效,譬如,字符串變量、數(shù)字、列表或者字典等。下面是幾個線程安全的操作:
讀或者替換一個實例屬性讀或者替換一個全局變量從列表中獲取一項元素原位修改一個列表(例如:使用append增加一個列表項)從字典中獲取一項元素原位修改一個字典(例如:增加一個字典項、調(diào)用clear方法)
注意,上面提到過,對一個變量或者屬性進行讀操作,然后修改它,最終將其回寫不是線程安全的。因為另外一個線程會在這個線程讀完卻沒有修改或回寫完成之前更改這個共享變量/屬性。
鎖
鎖是Python的threading模塊提供的最基本的同步機制。在任一時刻,一個鎖對象可能被一個線程獲取,或者不被任何線程獲取。如果一個線程嘗試去獲取一個已經(jīng)被另一個線程獲取到的鎖對象,那么這個想要獲取鎖對象的線程只能暫時終止執(zhí)行直到鎖對象被另一個線程釋放掉。
鎖通常被用來實現(xiàn)對共享資源的同步訪問。為每一個共享資源創(chuàng)建一個Lock對象,當你需要訪問該資源時,調(diào)用acquire方法來獲取鎖對象(如果其它線程已經(jīng)獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調(diào)用release方法釋放鎖:
lock = Lock()
lock.acquire() #: will block if lock is already held
... access shared resource
lock.release()
注意,即使在訪問共享資源的過程中出錯了也應(yīng)該釋放鎖,可以用try-finally來達到這一目的:
lock.acquire()
try:
... access shared resource
finally:
lock.release() #: release lock, no matter what
在Python 2.5及以后的版本中,你可以使用with語句。在使用鎖的時候,with語句會在進入語句塊之前自動的獲取到該鎖對象,然后在語句塊執(zhí)行完成后自動釋放掉鎖:
from __future__ import with_statement #: 2.5 only
with lock:
... access shared resource
acquire方法帶一個可選的等待標識,它可用于設(shè)定當有其它線程占有鎖時是否阻塞。如果你將其值設(shè)為False,那么acquire方法將不再阻塞,只是如果該鎖被占有時它會返回False:
if not lock.acquire(False):
... 鎖資源失敗
else:
try:
... access shared resource
finally:
lock.release()
你可以使用locked方法來檢查一個鎖對象是否已被獲取,注意不能用該方法來判斷調(diào)用acquire方法時是否會阻塞,因為在locked方法調(diào)用完成到下一條語句(比如acquire)執(zhí)行之間該鎖有可能被其它線程占有。
if not lock.locked():
#: 其它線程可能在下一條語句執(zhí)行之前占有了該鎖
lock.acquire() #: 可能會阻塞
簡單鎖的缺點
標準的鎖對象并不關(guān)心當前是哪個線程占有了該鎖;如果該鎖已經(jīng)被占有了,那么任何其它嘗試獲取該鎖的線程都會被阻塞,即使是占有鎖的這個線程。考慮一下下面這個例子:
lock = threading.Lock()
def get_first_part():
lock.acquire()
try:
... 從共享對象中獲取第一部分數(shù)據(jù)
finally:
lock.release()
return data
def get_second_part():
lock.acquire()
try:
... 從共享對象中獲取第二部分數(shù)據(jù)
finally:
lock.release()
return data
示例中,我們有一個共享資源,有兩個分別取這個共享資源第一部分和第二部分的函數(shù)。兩個訪問函數(shù)都使用了鎖來確保在獲取數(shù)據(jù)時沒有其它線程修改對應(yīng)的共享數(shù)據(jù)。
現(xiàn)在,如果我們想添加第三個函數(shù)來獲取兩個部分的數(shù)據(jù),我們將會陷入泥潭。一個簡單的方法是依次調(diào)用這兩個函數(shù),然后返回結(jié)合的結(jié)果:
def get_both_parts():
first = get_first_part()
seconde = get_second_part()
return first, second
這里的問題是,如有某個線程在兩個函數(shù)調(diào)用之間修改了共享資源,那么我們最終會得到不一致的數(shù)據(jù)。最明顯的解決方法是在這個函數(shù)中也使用lock:
def get_both_parts():
lock.acquire()
try:
first = get_first_part()
seconde = get_second_part()
finally:
lock.release()
return first, second
然而,這是不可行的。里面的兩個訪問函數(shù)將會阻塞,因為外層語句已經(jīng)占有了該鎖。為了解決這個問題,你可以通過使用標記在訪問函數(shù)中讓外層語句釋放鎖,但這樣容易失去控制并導致出錯。幸運的是,threading模塊包含了一個更加實用的鎖實現(xiàn):re-entrant鎖。
Re-Entrant Locks (RLock)
RLock類是簡單鎖的另一個版本,它的特點在于,同一個鎖對象只有在被其它的線程占有時嘗試獲取才會發(fā)生阻塞;而簡單鎖在同一個線程中同時只能被占有一次。如果當前線程已經(jīng)占有了某個RLock鎖對象,那么當前線程仍能再次獲取到該RLock鎖對象。
lock = threading.Lock()
lock.acquire()
lock.acquire() #: 這里將會阻塞
lock = threading.RLock()
lock.acquire()
lock.acquire() #: 這里不會發(fā)生阻塞
RLock的主要作用是解決嵌套訪問共享資源的問題,就像前面描述的示例。要想解決前面示例中的問題,我們只需要將Lock換為RLock對象,這樣嵌套調(diào)用也會OK.
lock = threading.RLock()
def get_first_part():
... see above
def get_second_part():
... see above
def get_both_parts():
... see above
這樣既可以單獨訪問兩部分數(shù)據(jù)也可以一次訪問兩部分數(shù)據(jù)而不會被鎖阻塞或者獲得不一致的數(shù)據(jù)。
注意RLock會追蹤遞歸層級,因此記得在acquire后進行release操作。
Semaphores
信號量是一個更高級的鎖機制。信號量內(nèi)部有一個計數(shù)器而不像鎖對象內(nèi)部有鎖標識,而且只有當占用信號量的線程數(shù)超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區(qū)。
semaphore = threading.BoundedSemaphore()
semaphore.acquire() #: counter減小
... 訪問共享資源
semaphore.release() #: counter增大
當信號量被獲取的時候,計數(shù)器減?。划斝盘柫勘会尫诺臅r候,計數(shù)器增大。當獲取信號量的時候,如果計數(shù)器值為0,則該進程將阻塞。當某一信號量被釋放,counter值增加為1時,被阻塞的線程(如果有的話)中會有一個得以繼續(xù)運行。
信號量通常被用來限制對容量有限的資源的訪問,比如一個網(wǎng)絡(luò)連接或者數(shù)據(jù)庫服務(wù)器。在這類場景中,只需要將計數(shù)器初始化為最大值,信號量的實現(xiàn)將為你完成剩下的事情。
max_connections = 10
semaphore = threading.BoundedSemaphore(max_connections)
如果你不傳任何初始化參數(shù),計數(shù)器的值會被初始化為1.
Python的threading模塊提供了兩種信號量實現(xiàn)。Semaphore類提供了一個無限大小的信號量,你可以調(diào)用release任意次來增大計數(shù)器的值。為了避免錯誤出現(xiàn),最好使用BoundedSemaphore類,這樣當你調(diào)用release的次數(shù)大于acquire次數(shù)時程序會出錯提醒。
線程同步
鎖可以用在線程間的同步上。threading模塊包含了一些用于線程間同步的類。
Events
一個事件是一個簡單的同步對象,事件表示為一個內(nèi)部標識(internal flag),線程等待這個標識被其它線程設(shè)定,或者自己設(shè)定、清除這個標識。
event = threading.Event()
#: 一個客戶端線程等待flag被設(shè)定
event.wait()
#: 服務(wù)端線程設(shè)置或者清除flag
event.set()
event.clear()
一旦標識被設(shè)定,wait方法就不做任何處理(不會阻塞),當標識被清除時,wait將被阻塞直至其被重新設(shè)定。任意數(shù)量的線程可能會等待同一個事件。
Conditions
條件是事件對象的高級版本。條件表現(xiàn)為程序中的某種狀態(tài)改變,線程可以等待給定條件或者條件發(fā)生的信號。
下面是一個簡單的生產(chǎn)者/消費者實例。首先你需要創(chuàng)建一個條件對象:
#: 表示一個資源的附屬項
condition = threading.Condition()
生產(chǎn)者線程在通知消費者線程有新生成資源之前需要獲得條件:
#: 生產(chǎn)者線程
... 生產(chǎn)資源項
condition.acquire()
... 將資源項添加到資源中
condition.notify() #: 發(fā)出有可用資源的信號
condition.release()
消費者必須獲取條件(以及相關(guān)聯(lián)的鎖),然后嘗試從資源中獲取資源項:
#: 消費者線程
condition.acquire()
while True:
...從資源中獲取資源項
if item:
break
condition.wait() #: 休眠,直至有新的資源
condition.release()
... 處理資源
wait方法釋放了鎖,然后將當前線程阻塞,直到有其它線程調(diào)用了同一條件對象的notify或者notifyAll方法,然后又重新拿到鎖。如果同時有多個線程在等待,那么notify方法只會喚醒其中的一個線程,而notifyAll則會喚醒全部線程。
為了避免在wait方法處阻塞,你可以傳入一個超時參數(shù),一個以秒為單位的浮點數(shù)。如果設(shè)置了超時參數(shù),wait將會在指定時間返回,即使notify沒被調(diào)用。一旦使用了超時,你必須檢查資源來確定發(fā)生了什么。
注意,條件對象關(guān)聯(lián)著一個鎖,你必須在訪問條件之前獲取這個鎖;同樣的,你必須在完成對條件的訪問時釋放這個鎖。在生產(chǎn)代碼中,你應(yīng)該使用try-finally或者with.
可以通過將鎖對象作為條件構(gòu)造函數(shù)的參數(shù)來讓條件關(guān)聯(lián)一個已經(jīng)存在的鎖,這可以實現(xiàn)多個條件公用一個資源:
lock = threading.RLock()
condition_1 = threading.Condition(lock)
condition_2 = threading.Condition(lock)
互斥鎖同步
我們先來看一個例子:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time, threading
# 假定這是你的銀行存款:
balance = 0
muxlock = threading.Lock()
def change_it(n):
# 先存后取,結(jié)果應(yīng)該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
# 循環(huán)次數(shù)一旦多起來,最后的數(shù)字就變成非0
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t3 = threading.Thread(target=run_thread, args=(9,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print balance
結(jié)果 :
[/data/web/test_python]$ python multhread_threading.py
[/data/web/test_python]$ python multhread_threading.py
61
[/data/web/test_python]$ python multhread_threading.py
[/data/web/test_python]$ python multhread_threading.py
24
上面的例子引出了多線程編程的最常見問題:數(shù)據(jù)共享。當多個線程都修改某一個共享數(shù)據(jù)的時候,需要進行同步控制。
線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖?;コ怄i為資源引入一個狀態(tài):鎖定/非鎖定。某個線程要更改共享數(shù)據(jù)時,先將其鎖定,此時資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源?;コ怄i保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數(shù)據(jù)的正確性。
threading模塊中定義了Lock類,可以方便的處理鎖定:
#創(chuàng)建鎖mutex = threading.Lock()
#鎖定mutex.acquire([timeout])
#釋放mutex.release()
其中,鎖定方法acquire可以有一個超時時間的可選參數(shù)timeout。如果設(shè)定了timeout,則在超時后通過返回值可以判斷是否得到了鎖,從而可以進行一些其他的處理。
使用互斥鎖實現(xiàn)上面的例子的代碼如下:
balance = 0
muxlock = threading.Lock()
def change_it(n):
# 獲取鎖,確保只有一個線程操作這個數(shù)
muxlock.acquire()
global balance
balance = balance + n
balance = balance - n
# 釋放鎖,給其他被阻塞的線程繼續(xù)操作
muxlock.release()
def run_thread(n):
for i in range(10000):
change_it(n)
加鎖后的結(jié)果,就能確保數(shù)據(jù)正確:
[/data/web/test_python]$ python multhread_threading.py
[/data/web/test_python]$ python multhread_threading.py
[/data/web/test_python]$ python multhread_threading.py
[/data/web/test_python]$ python multhread_threading.py
【常見的內(nèi)置函數(shù)】
1、enumerate(iterable,start=0)
是python的內(nèi)置函數(shù),是枚舉、列舉的意思,對于一個可迭代的(iterable)/可遍歷的對象(如列表、字符串),enumerate將其組成一個索引序列,利用它可以同時獲得索引和值。
2、zip(*iterables,strict=False)
用于將可迭代的對象作為參數(shù),將對象中對應(yīng)的元素打包成一個個元組,然后返回由這些元組組成的列表。如果各個迭代器的元素個數(shù)不一致,則返回列表長度與最短的對象相同,利用*號操作符,可以將元組解壓為列表。
3、filter(function,iterable)
filter是將一個序列進行過濾,返回迭代器的對象,去除不滿足條件的序列。
4、isinstance(object,classinfo)
是用來判斷某一個變量或者是對象是不是屬于某種類型的一個函數(shù),如果參數(shù)object是classinfo的實例,或者object是classinfo類的子類的一個實例,
返回True。如果object不是一個給定類型的的對象, 則返回結(jié)果總是False
5、eval(expression[,globals[,locals]])
用來將字符串str當成有效的表達式來求值并返回計算結(jié)果,表達式解析參數(shù)expression并作為Python表達式進行求值(從技術(shù)上說是一個條件列表),采用globals和locals字典作為全局和局部命名空間。
【常用的句式】
1、format字符串格式化
format把字符串當成一個模板,通過傳入的參數(shù)進行格式化,非常實用且強大。
2、連接字符串
常使用+連接兩個字符串。
3、if...else條件語句
Python條件語句是通過一條或多條語句的執(zhí)行結(jié)果(True或者False)來決定執(zhí)行的代碼塊。其中if...else語句用來執(zhí)行需要判斷的情形。
4、for...in、while循環(huán)語句
循環(huán)語句就是遍歷一個序列,循環(huán)去執(zhí)行某個操作,Python中的循環(huán)語句有for和while。
5、import導入其他腳本的功能
有時需要使用另一個python文件中的腳本,這其實很簡單,就像使用import關(guān)鍵字導入任何模塊一樣。
是的,Python可以使用os模塊的walk函數(shù)來同步本地磁盤文件夾中的字典。walk函數(shù)會返回一個三元組(root,dirs,files),這個三元組包含根目錄、子目錄和遞歸遍歷的文件列表,可以組成一個字典,將文件根據(jù)它們所屬的文件夾來歸類。
python系統(tǒng)提供了下面常用的函數(shù):
1. 數(shù)學庫模塊(math)提供了很多數(shù)學運算函數(shù);
2.復數(shù)模塊(cmath)提供了用于復數(shù)運算的函數(shù);
3.隨機數(shù)模塊(random)提供了用來生成隨機數(shù)的函數(shù);
4.時間(time)和日歷(calendar)模塊提供了能處理日期和時間的函數(shù)。
注意:在調(diào)用系統(tǒng)函數(shù)之前,先要使用import 語句導入 相應(yīng)的模塊
該語句將模塊中定義的函數(shù)代碼復制到自己的程 序中,然后就可以訪問模塊中的任何函數(shù),其方 法是在函數(shù)名前面加上“模塊名.”。
希望能幫到你。