真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

40線程2_Event_Lock

?

創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供房縣網(wǎng)站建設(shè)、房縣做網(wǎng)站、房縣網(wǎng)站設(shè)計、房縣網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、房縣企業(yè)網(wǎng)站模板建站服務(wù),十多年房縣做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。

?

?

線程同步:

線程間協(xié)同,通過某種技術(shù),讓一個線程訪問某些數(shù)據(jù)時,其它線程不能訪問這些數(shù)據(jù),直到該線程完成對數(shù)據(jù)的操作;

?

結(jié)果要能預(yù)知;

?

critical section,臨界區(qū);

mutex,互斥量,threading.Lock類;?? #即鎖,你用我不用,我用你不用

semaphore,信號量;

event,事件,threading.Event類;

barrier,屏障、柵欄,threading.Barrier;

?

幫助:

Python 3.5.3 documentation-->Library Reference-->Concurrent Execution-->threading

?

?

?

目錄

threading.Event類:...1

theading.Lock類:...4

阻塞鎖:...5

非阻塞鎖:...10

?

?

?

threading.Event類:

event,事件,是線程間通信機制中最簡單的實現(xiàn),使用一個內(nèi)部的flag標記,通過flag的True或False的變化來進行操作;

?

e.set(),標記設(shè)置為True;

e.clear(),標記設(shè)置為False;

e.is_set(),標記是否為True;

e.wait(timeout=None),Block until the internal flag is true,設(shè)置等待標記為True的時長,None為無限等待,等到返回True,未等到超時了返回False;

?

e.wait()的使用:

wait優(yōu)于sleep,wait會主動讓出時間片,其它線程可以被調(diào)度,而sleep會占用時間片不讓出;

?

例:

def do(interval, e: threading.Event):

??? while not e.wait(interval):

??????? logging.info('do sth')

?

e = threading.Event()

threading.Thread(target=do, args=(3,e)).start()

e.wait(5)?? #可用time.sleep(5),e.wait(5)優(yōu)于time.sleep(5)

e.set()

print('main exit')

輸出:

2018-07-30-14:58:08?????? Thread info: 10268 Thread-1 do sth

main exit

?

例:

老板雇傭了一個工人,讓他生產(chǎn)杯子,老板一直等著工人,直到生產(chǎn)了10個杯子;

import threading

import time

import logging

FORMAT = '%(asctime)-15s\tThread info: %(thread)d %(threadName)s %(message)s'

logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt='%Y-%m-%d-%H:%M:%S')

?

cups = []?? #當前,如果多個工人做則有問題

event = threading.Event()?? #實例,主線程、boss、worker都要看event,注意其作用域,使用同一個Event對象的flag

?

def boss(e: threading.Event):

??? logging.info("I'm boss,waiting for U.")

??? e.wait()

??? logging.info('good job.')

?

def worker(n, e: threading.Event):

??? while True:

??????? time.sleep(0.5)

??????? cups.append(1)

??????? logging.info('make 1')

??????? if len(cups) >= n:

??????????? # logging.info('I finished my job. {}'.format(len(cups)))?? #此處打印也可放到下面,老板在主線程中等待,if event.wait():中

??????????? e.set()

?? ?????????break

???????? logging.info('I finished my job. {}'.format(len(cups)))

?

b = threading.Thread(target=boss, args=(event,))

w = threading.Thread(target=worker, args=(10, event))

w.start()

b.start()

?

# if event.wait():?? #方1,老板在主線程中等待

# ????logging.info('I finished my job. {}'.format(len(cups)))

?

# while not event.wait(1):?? #方2,1秒看1次,老板在主線程中等待

#???? pass

# logging.info('I finished my job. {}'.format(len(cups)))

?

# while not event.is_set():?? #方3,老板在主線程中等待

#???? event.wait()?? #誰wait就是等到flag變?yōu)門rue,或等到超時返回False,不限制等待的個數(shù)

# logging.info('I finished my job. {}'.format(len(cups)))

?

輸出:

2018-07-30-14:47:12?????? Thread info: 11032 Thread-1 I'm boss,waiting for U.

2018-07-30-14:47:13?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:13?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:14?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:14?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:15?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:15?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:16?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:16?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:17?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:17?????? Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:17?????? Thread info: 6996 Thread-2 I finished my job. 10

2018-07-30-14:47:17?????? Thread info: 11032 Thread-1 good job.

?

例:

Event練習(xí):

實現(xiàn)Timer,延時執(zhí)行的線程;

延時計算add(x,y);

思路:

Timer的構(gòu)造函數(shù)中參數(shù)得有哪些?

如何實現(xiàn)start啟動一個線程執(zhí)行函數(shù)?

如何cancel取消待執(zhí)行任務(wù)?

?

class Timer:

??? def __init__(self, interval, fn, *args, **kwargs):

??????? self.interval = interval

??????? self.fn = fn

??????? self.args = args

??????? self.kwargs = kwargs

??????? self.event = threading.Event()

?

??? def start(self):

??????? threading.Thread(target=self.__do).start()

?

??? def cancel(self):

??????? self.event.set()

?

??? def __do(self):

??????? self.event.wait(self.interval)?? #等待超時后返回False

?????????????????? # print(self.event.__dict__)?? # {'_flag': False, '_cond': , 0)>}

??????? print(self.event.is_set())?? #False

??????? if not self.event.is_set():

??????????? self.fn(*self.args, **self.kwargs)

?

def add(x, y):

??? logging.info(x+y)

?

t = Timer(3, add, 4, 5)

t.start()

#t.cancel()?? #有此句self.is_set()為True

輸出:

False

2018-07-31-09:29:45?????? Thread info: 5156 Thread-1 9

?

?

?

theading.Lock類:

解決多線程中數(shù)據(jù)同步;

?

凡是存在共享資源爭搶的地方都可使用鎖,從而保證只有一個使用者可完全使用這個資源;

?

程序=數(shù)據(jù)+算法,要保證數(shù)據(jù)的結(jié)果是可預(yù)知的,若不可預(yù)知該程序是沒用的,為得到正確的結(jié)果,性能是其次已不重要了;

?

一旦某一線程獲得鎖,其它試圖獲取的線程將被阻塞;

?

acquire(blocking=True,timeout=-1),默認阻塞,阻塞可設(shè)置超時時間;非阻塞時,timeout禁止設(shè)置;成功獲取鎖,返回True,否則返回False;

release(),釋放鎖,可從任何線程調(diào)用釋放;該方法執(zhí)行后,已上鎖的鎖會被重置為unlocked;在未上鎖的鎖上調(diào)用release(),拋RuntimeError: release unlocked lock;

?

注:

鎖將acquire()和release()中間的代碼管?。?/p>

大多數(shù)情況下用的是阻塞鎖;

鎖,類似ATM機的小房子;

加鎖,應(yīng)在分析業(yè)務(wù)基礎(chǔ)之上,在合適的地方加,不能從前到后加一把大鎖,這樣效率低下;

?

加鎖、解鎖:

一般來說,加鎖后還要有一些代碼實現(xiàn),在釋放鎖之前還有可能拋異常,一旦出現(xiàn)異常,鎖無法釋放,但當前線程可能因為這個異常終止了,這就產(chǎn)生了死鎖;

一般誰(某個線程)加的鎖誰解鎖;

常用語句:

try...finally,保證鎖的釋放;

with,Lock類中有__enter__()和__exit__(),鎖對象支持上下文管理;

?

鎖的應(yīng)用場景:

鎖,適用于訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候;

如果全部都是讀取同一個共享資源需要鎖嗎?不需要,因為這時可認為共享資源是不可變的,每一次讀取它都是一樣的值,所以不用加鎖;

不使用鎖,雖有了效率,但結(jié)果是錯的;

使用了鎖,雖效率低下,但結(jié)果是對的,所以為了對的結(jié)果,讓計算機計算去吧!

?

使用鎖的注意事項:

少用鎖,必要時加鎖,使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊執(zhí)行,要么爭搶執(zhí)行;

例如,高速公路上的車并行跑,到了省界只開放了一個收費口,過了這個口,車輛依然可在多車道上并行跑;過收費口時,如果排隊一輛一輛過,加不加鎖一樣,效率相當,但一旦出現(xiàn)爭搶,就必須加鎖一輛輛過;

加鎖時間越短越好,不需要就立即釋放鎖;

一定要避免死鎖,盡量用with語句;

?

?

阻塞鎖:

lock.acquire()

?

例:

lock = threading.Lock()

lock.acquire()

print('get locker1')

lock.acquire()?? #block,再次獲取同一個鎖時阻塞(獨占),必須等別人將該鎖釋放才能獲取到,此行之后的語句不能執(zhí)行;即,某一線程獲得鎖后,另一線程只要執(zhí)行到lock.acquire()就會阻塞?。ㄗ。?/p>

print('get locker2')

lock.release()?? #一旦某一線程release,等著的線程爭搶,誰搶到誰上鎖;一般情況,有多少個acquire就有多少個release;要保證一定release,用到上下文(with語句)或try...finally

print('release locker')

?

注:

死鎖(解不開的鎖,就是死鎖):

同一個鎖,你等我釋放、我等你釋放、自己等自己釋放;

兩個鎖,你等我放一把,我等你放一把,互相等待,都不釋放,都為阻塞狀態(tài);

?

例:

def work():

??? time.sleep(3)

??? lock.release()?? #任何一個線程都可對同一個鎖操作,此處釋放鎖

?

lock = threading.Lock()

lock.acquire()

print('get locker1')

threading.Thread(target=work).start()

time.sleep(5)

lock.acquire()

print('get locker2')

lock.release()

print('release locker')

?

例:

需求:10個工人完成100個杯子;

注:臨界點問題:

當生產(chǎn)到99個時,若每個線程都可看到,每個線程看到轉(zhuǎn)頭去生產(chǎn)了,導(dǎo)致結(jié)果會多;

只允許一個線程可看到當前生產(chǎn)杯子的數(shù)量,其它線程不可見,這個線程看到后生產(chǎn)完1個杯子時,其它線程均可看到,杯子數(shù)量達到要求就不生產(chǎn)了;

?

cups = []

?

def worker(task=100):

??? while True:

??????? count = len(cups)

??????? logging.info('current count: {}'.format(count))

??????? if count >= task:

??????????? break

??????? cups.append(1)

??????? logging.info('{} make 1'.format(threading.current_thread()))

??? logging.info('total: {}'.format(count))

?

for _ in range(10):

??? threading.Thread(target=worker).start()

輸出:

……

2018-08-06-09:13:14?????? Thread info: 10208 Thread-2 current count: 104

2018-08-06-09:13:14?????? Thread info: 10208 Thread-2 total: 104

2018-08-06-09:13:14?????? Thread info: 10252 Thread-6 make 1

2018-08-06-09:13:14?????? Thread info: 10252 Thread-6 current count: 104

2018-08-06-09:13:14?????? Thread info: 10252 Thread-6 total: 104

?

解決,加鎖:

cups = []

lock = threading.Lock()

?

def worker(lock:threading.Lock, task=100):

??? while True:

??????? lock.acquire()

??????? count = len(cups)

??????? logging.info('current count: {}'.format(count))

??????? lock.release()

??????? if count >= task:?? #雖不是大鎖,仍有問題,在臨界點時,其它線程仍可看到杯子總數(shù),繼而再做;解決,把中間代碼全部放到鎖里

??????????? break

??????? lock.acquire()

??????? cups.append(1)

??????? lock.release()

??????? logging.info('{} make 1'.format(threading.current_thread()))

??? logging.info('total: {}'.format(count))

?

for _ in range(10):

??? threading.Thread(target=worker, args=(lock,)).start()

輸出:

……

2018-08-06-09:19:11?????? Thread info: 4092 Thread-4 current count: 100

2018-08-06-09:19:11?????? Thread info: 4092 Thread-4 total: 100

2018-08-06-09:19:11?????? Thread info: 11024 Thread-6 current count: 100

2018-08-06-09:19:11?????? Thread info: 11024 Thread-6 total: 100

?

例:

cups = []

lock = threading.Lock()

?

def worker(lock:threading.Lock, task=100):

??? while True:

??????? lock.acquire()?? #該鎖為互斥鎖,你有我沒有,我有你沒有

??????? count = len(cups)

??????? logging.info('current count: {}'.format(count))

??????? # lock.release()

??????? if count >= task:

??????????????????????????? lock.release()

???? ???????break?? #仍有問題,break后未釋放鎖,解決try...finally或上下文

??????? # lock.acquire()

??????? cups.append(1)

??????? lock.release()

??????? logging.info('{} make 1'.format(threading.current_thread()))

??? logging.info('total: {}'.format(count))

?

for _ in range(10):

??? threading.Thread(target=worker, args=(lock,)).start()

輸出:

……

2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 current count: 99

2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 make 1

2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 current count: 100

2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 total: 100

?

例:

計數(shù)器類,可以加、可以減;

class Counter:

??? def __init__(self):

??????? self.__val = 0

?

??? def inc(self):

??????? self.__val += 1

?

??? def dec(self):

??????? self.__val -= 1

?

??? @property

??? def value(self):

??????? return self.__val

?

def do(c:Counter, count=100):

??? for _ in range(count):

??????? for i in range(-50,50):

??????????? if i < 0:

??????????????? c.dec()

??????????? else:

??????????????? c.inc()

?

c = Counter()

threadcount = 10?? #依次10,100,1000查看,結(jié)果沒變化

c2 = 1000?? #依次10,100,1000查看,當1000時,每個線程耗cpu時間長,結(jié)果將不可預(yù)知

for i in range(threadcount):

??? threading.Thread(target=do, args=(c,c2)).start()

# time.sleep(5)

# print(c.value)?? #多線程情況下,每個線程耗cpu時間長的情況下,此處打印的值不是最終結(jié)果

while True:

??? time.sleep(3)

??? print(threading.enumerate())

??? print(c.value)

輸出:

[<_MainThread(MainThread, started 7424)>]

81

[<_MainThread(MainThread, started 7424)>]

81

?

例,解決,加鎖:

class Counter:

??? def __init__(self):

??????? self.__val = 0

??????? self.__lock = threading.Lock()

?

??? def inc(self):

??????? try:?? #同with self.__lock: self.__val += 1;

??????????? self.__lock.acquire()

??????????? self.__val += 1

??????? finally:

??????????? self.__lock.release()

?

??? def dec(self):

??????? with self.__lock:

??????????? self.__val -= 1

?

??? @property

??? def value(self):

??????? with self.__lock:

??????????? return self.__val

?

def do(c:Counter, count=100):

??? for _ in range(count):

??????? for i in range(-50,50):

??????????? if i < 0:

??????????????? c.dec()

??????????? else:

??????????????? c.inc()

?

c = Counter()

threadcount = 10

c2 = 1000

for i in range(threadcount):

??? threading.Thread(target=do, args=(c,c2)).start()

# time.sleep(5)

# print(c.value)

while True:

??? time.sleep(1)

??? # print(threading.enumerate())

??? # print(c.value)

??? if threading.active_count() == 1:

??????? print(threading.enumerate())

??????? print(c.value)

??? else:

??????? print(threading.enumerate())

輸出:

[, , , <_MainThread(MainThread, started 12200)>, , , , ]

[<_MainThread(MainThread, started 12200)>]

0

[<_MainThread(MainThread, started 12200)>]

0

?

?

非阻塞鎖:

lock.acquire(False),獲取到鎖返回True,否則返回False;

?

?

例:

lock = threading.Lock()

lock.acquire()

ret = lock.acquire(False)?? #獲取到鎖則返回True

print(ret)

輸出:

False

?

例:

cups = []

lock = threading.Lock()

?

def worker(lock:threading.Lock, task=100):

??? while True:

??????? if lock.acquire(False):

??????????? count = len(cups)

??????????? logging.info('current count: {}'.format(count))

??????????? if count >= task:

??????????????? lock.release()

??????????????? break

??????????? cups.append(1)

??????????? lock.release()

??????????? logging.info('{} make 1'.format(threading.current_thread()))

??? logging.info('total: {}'.format(count))

?

for _ in range(10):

??? threading.Thread(target=worker, args=(lock,)).start()

輸出:

……

2018-08-06-13:52:35?????? Thread info: 11232 Thread-9 current count: 100

2018-08-06-13:52:35?????? Thread info: 11232 Thread-9 total: 100

2018-08-06-13:52:35?????? Thread info: 11752 Thread-10 current count: 100

2018-08-06-13:52:35?????? Thread info: 11752 Thread-10 total: 100

?

例:

def worker(tasks):

??? for task in tasks:

??????? if task.lock.acquire(False):

??????????? time.sleep(0.01)

??????????? logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))

??????? else:

??????????? logging.info('{} {} is working'.format(threading.current_thread(), task.name))

?

class Task:

??? def __init__(self, name):

??????? self.name = name

??????? self.lock = threading.Lock()

?

tasks = [Task('task-{}'.format(x)) for x in range(10)]

?

for i in range(5):

??? threading.Thread(target=worker, args=(tasks,), name='worker-{}'.format(i)).start()

輸出:

……

2018-08-06-14:10:34?????? Thread info: 12256 worker-1 task-7 is working

2018-08-06-14:10:34?????? Thread info: 12256 worker-1 task-8 is working

2018-08-06-14:10:34?????? Thread info: 12256 worker-1 task-9 is working

2018-08-06-14:10:34?????? Thread info: 11912 worker-3 task-9 begin to start

2018-08-06-14:10:34?????? Thread info: 10496 worker-4 task-8 begin to start

2018-08-06-14:10:34?????? Thread info: 10496 worker-4 task-9 is working

?

?

?


當前文章:40線程2_Event_Lock
文章鏈接:http://weahome.cn/article/jopdhc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部