python 线程同步_Python并发编程-线程同步(线程安全)
Python并發編程-線程同步(線程安全)
作者:尹正杰
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
線程同步,線程間協調,通過某種技術,讓一個線程訪問某些數據時,其它線程不能訪問這些數據,直到該線程完成對數據的操作。
一.Event
1>.Event的常用方法
Event事件,是線程通信機制中最簡單的實現,使用一個內部的標記flag,通過flage的True或False的變化來進行操作。
常用方法如下:
set():
標記為True。clear():
標記設置為Flase。
is_set():
查詢標記是否為True。wait(timeout=None):
設置等待標記為True的時長,None為無線等待。等到返回True,未等到超時了返回False。
2>.Event使用案例
1 #!/usr/bin/envpython2 #_*_conding:utf-8_*_3 #@author :yinzhengjie4 #blog:http://www.cnblogs.com/yinzhengjie
5
6
7 from threading import Event,Thread8 import logging9
10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
11 logging.basicConfig(format=FORMAT,level=logging.INFO)12
13 def boss(event:Event):14 logging.info("I'm boss,waiting for U")15 event.wait() #阻塞等待,直到event被標記為Ture16 logging.info("Good Job.")17
18 def worker(event:Event,count=10):19 logging.info("I'm working for U.")20 cups =[]21 while not event.wait(0.5): #使用wait等待0.5秒(相當于"time.sleep(0.5)"),若規定事件內event依舊標記依舊沒有設置為True,則返回False22 logging.info("make 1 cup")23 cups.append(1)24 if len(cups) >=count:25 event.set() #將標記設置為True26 break27 logging.info("I finished my job.cups={}".format(cups))28
29 event =Event()30 print(event.is_set()) #event實例的標記默認為False31
32 b = Thread(target=boss,name="boss",args=(event,))33 w = Thread(target=worker,name="worker",args=(event,))34 b.start()35 w.start()
2019-11-23 14:54:53,177 boss 10916 I'm boss,waiting for U
2019-11-23 14:54:53,178 worker 15672 I'm working for U.
False2019-11-23 14:54:53,678 worker 15672 make 1cup2019-11-23 14:54:54,179 worker 15672 make 1cup2019-11-23 14:54:54,680 worker 15672 make 1cup2019-11-23 14:54:55,180 worker 15672 make 1cup2019-11-23 14:54:55,680 worker 15672 make 1cup2019-11-23 14:54:56,181 worker 15672 make 1cup2019-11-23 14:54:56,681 worker 15672 make 1cup2019-11-23 14:54:57,181 worker 15672 make 1cup2019-11-23 14:54:57,681 worker 15672 make 1cup2019-11-23 14:54:58,182 worker 15672 make 1cup2019-11-23 14:54:58,182 worker 15672 I finished my job.cups=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]2019-11-23 14:54:58,182 boss 10916 Good Job.
以上代碼執行結果戳這里
3>.定時器Timer(延遲執行)
1 #!/usr/bin/envpython2 #_*_conding:utf-8_*_3 #@author :yinzhengjie4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 import threading7 import logging8 import time
9
10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
11 logging.basicConfig(level=logging.INFO,format=FORMAT)12
13 def worker():14 logging.info("working")15 time.sleep(2)16
17 """18 Timer是線程Thread的子類,Timer實例內部提供了一個finished屬性,該屬性是Event對象。19 Timer是線程Thread的子類,就是線程類,具有線程的能力和特征。20 它的實例時能夠延時執行目標函數的線程,在真正執行目標函數之前,都可以cancel它。21 cacel方法本質使用Event類實現。這并不是說,線程提供了取消的方法。22 """23 t = threading.Timer(4,worker) #當t對象調用start方法后,等待4秒后執行worker函數24 t.setName("timer")25
26 t.start()27 # t.cancel() #本質上是在worker函數執行前對finished屬性set方法操作,從而跳過了worker函數執行,達到了取消的效果。28
29 for _ in range(10):30 print(threading.enumerate())31 time.sleep(1)
[<_MainThread(MainThread, started 5332)>, ]
[<_MainThread(MainThread, started 5332)>, ]
[<_MainThread(MainThread, started 5332)>, ]
[<_MainThread(MainThread, started 5332)>, ]
[<_MainThread(MainThread, started 5332)>, ]2019-11-23 15:07:40,987 timer 6656working
[<_MainThread(MainThread, started 5332)>, ]
[<_MainThread(MainThread, started 5332)>]
[<_MainThread(MainThread, started 5332)>]
[<_MainThread(MainThread, started 5332)>]
[<_MainThread(MainThread, started 5332)>]
以上代碼執行結果戳這里
二.Lock
1>.Lock的常用方法
鎖,一旦線程獲得鎖,其它試圖獲取鎖的線程將被阻塞。
鎖:凡是存在資源共享爭搶的地方都可以使用鎖,從而只有一個使用者可以完全使用這個資源。
鎖常用的方法如下:
acquire(blocking=True,timeout=-1):
默認阻塞,阻塞可以設置超時事件。非阻塞時,timeout禁止設置。
成功獲取鎖,返回True,否則返回False
release():
釋放鎖。可以從任何線程調用釋放。
已上鎖的鎖,會被重置為unlocked
若在未上鎖的鎖上調用,則會拋出RuntimeError異常。
2>.Lock鎖使用案例
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 from threading importThread,Lock7 importlogging8 importtime9
10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
11 logging.basicConfig(level=logging.INFO,format=FORMAT)12
13 cpus =[]14 lock =Lock()15
16 def worker(count=10):17 logging.info("I'm working for U.")18 flag =False19 whileTrue:20 lock.acquire() #獲取鎖
21 if len(cpus) >=count:22 flag =True23 time.sleep(0.0001) #為了看出線程切換效果
24 if notflag:25 cpus.append(1)26 lock.release()27 ifflag:28 break
29 logging.info("I finished . cups = {}".format(len(cpus)))30
31
32 for _ in range(10):33 Thread(target=worker,args=(1000,)).start()
2019-11-23 16:03:35,225 Thread-1 16204 I'm working for U.
2019-11-23 16:03:35,226 Thread-2 14436 I'm working for U.
2019-11-23 16:03:35,226 Thread-3 1164 I'm working for U.
2019-11-23 16:03:35,226 Thread-4 10460 I'm working for U.
2019-11-23 16:03:35,227 Thread-5 5072 I'm working for U.
2019-11-23 16:03:35,227 Thread-6 12016 I'm working for U.
2019-11-23 16:03:35,227 Thread-7 9732 I'm working for U.
2019-11-23 16:03:35,228 Thread-8 15644 I'm working for U.
2019-11-23 16:03:35,228 Thread-9 104 I'm working for U.
2019-11-23 16:03:35,228 Thread-10 16508 I'm working for U.
2019-11-23 16:03:37,130 Thread-1 16204 I finished . cups = 1000
2019-11-23 16:03:37,132 Thread-3 1164 I finished . cups = 1000
2019-11-23 16:03:37,134 Thread-4 10460 I finished . cups = 1000
2019-11-23 16:03:37,136 Thread-2 14436 I finished . cups = 1000
2019-11-23 16:03:37,138 Thread-5 5072 I finished . cups = 1000
2019-11-23 16:03:37,140 Thread-6 12016 I finished . cups = 1000
2019-11-23 16:03:37,142 Thread-7 9732 I finished . cups = 1000
2019-11-23 16:03:37,144 Thread-8 15644 I finished . cups = 1000
2019-11-23 16:03:37,146 Thread-9 104 I finished . cups = 1000
2019-11-23 16:03:37,148 Thread-10 16508 I finished . cups = 1000
以上代碼執行結果戳這里
3>.加鎖和解鎖(計數器類案例)
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importthreading7 from threading importThread,Lock8 importlogging9 importtime10
11 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
12 logging.basicConfig(level=logging.INFO,format=FORMAT)13
14 classCounter:15 def __init__(self):16 self._val =017 self.__lock =Lock()18
19 @property20 defvalue(self):21 with self.__lock:22 returnself._val23
24 definc(self):25 try:26 self.__lock.acquire()27 self._val += 1
28 finally:29 self.__lock.release()30
31 defdec(self):32 with self.__lock:33 self._val -= 1
34
35
36 def worker(c:Counter,count=100):37 for _ inrange(count):38 for i in range(-50,50):39 if i <0:40 c.dec()41 else:42 c.inc()43
44 c =Counter()45 c1 = 10
46 c2 = 10000
47
48 for i inrange(c1):49 Thread(target=worker,args=(c,c2)).start()50
51
52 whileTrue:53 time.sleep(1)54 print(threading.enumerate())55 if threading.active_count() == 1:56 print((c.value))57 break
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , , , , ]
[<_MainThread(MainThread, started 14856)>, , , , , , ]
[<_MainThread(MainThread, started 14856)>]
0
以上代碼執行結果戳這里
4>.鎖的應用場景
鎖適用于訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候。
如果全部都是讀取同一個共享資源需要鎖嗎?
不需要。因為這時可以認為共享資源是不可變的,每一次讀取它都是一樣的值,所以不用加鎖
使用鎖的注意事項:
少用鎖,必要時用鎖。使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊執行,要么爭搶執行
舉例,高速公路上車并行跑,可是到了省界只開放了一個收費口,過了這個口,車輛依然可以在多車道上一起跑。過收費口的時候,如果排隊一輛輛過,加不加鎖一樣效率相當,但是一旦出現爭搶,就必須加鎖一輛輛過。注意,不管加不加鎖,只要是一輛輛過,效率就下降了。
加鎖時間越短越好,不需要就立即釋放鎖
一定要避免死鎖
不使用鎖,有了效率,但是結果是錯的。
使用了鎖,效率低下,但是結果是對的。
所以,我們是為了效率要錯誤結果呢?還是為了對的結果,讓計算機去計算吧。
5>.非阻塞鎖使用
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importthreading7 importlogging8 importtime9
10 FORMAT = "%(asctime)s %(threadName)s %(thread)-10d %(message)s"
11 logging.basicConfig(level=logging.INFO,format=FORMAT)12
13 lock =threading.Lock()14
15 defworker(l:threading.Lock):16 whileTrue:17 flag =l.acquire(False)18 ifflag:19 logging.info("do something.") #為了顯示效果,沒有釋放鎖
20 else:21 logging.info("try again")22 time.sleep(1)23
24 for i in range(5):25 threading.Thread(target=worker,name="worker={}".format(i),args=(lock,)).start()
2019-11-24 15:58:31,932 worker=0 123145354420224do something.2019-11-24 15:58:31,933 worker=0 123145354420224 tryagain2019-11-24 15:58:31,933 worker=1 123145359675392 tryagain2019-11-24 15:58:31,933 worker=2 123145364930560 tryagain2019-11-24 15:58:31,934 worker=3 123145370185728 tryagain2019-11-24 15:58:31,934 worker=4 123145375440896 tryagain2019-11-24 15:58:32,933 worker=0 123145354420224 tryagain2019-11-24 15:58:32,933 worker=1 123145359675392 tryagain2019-11-24 15:58:32,934 worker=2 123145364930560 tryagain2019-11-24 15:58:32,936 worker=4 123145375440896 tryagain2019-11-24 15:58:32,936 worker=3 123145370185728 tryagain2019-11-24 15:58:33,935 worker=0 123145354420224 tryagain2019-11-24 15:58:33,935 worker=1 123145359675392 tryagain2019-11-24 15:58:33,935 worker=2 123145364930560 tryagain2019-11-24 15:58:33,940 worker=4 123145375440896 tryagain2019-11-24 15:58:33,940 worker=3 123145370185728 tryagain2019-11-24 15:58:34,939 worker=0 123145354420224 tryagain2019-11-24 15:58:34,940 worker=1 123145359675392 tryagain2019-11-24 15:58:34,940 worker=2 123145364930560 tryagain2019-11-24 15:58:34,944 worker=4 123145375440896 tryagain2019-11-24 15:58:34,945 worker=3 123145370185728 tryagain2019-11-24 15:58:35,943 worker=1 123145359675392 tryagain2019-11-24 15:58:35,944 worker=0 123145354420224 tryagain2019-11-24 15:58:35,944 worker=2 123145364930560 tryagain2019-11-24 15:58:35,948 worker=4 123145375440896 tryagain2019-11-24 15:58:35,949 worker=3 123145370185728 tryagain
......
以上代碼執行結果戳這里
三.可重入鎖RLock
1>.可重入鎖不可跨越線程
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importthreading7 importtime8
9 lock =threading.RLock()10 print(lock.acquire()) #僅對當前線程上鎖,但是代碼并不會阻塞而是可以繼續執行
11 print(lock.acquire(blocking=True))12 print(lock.acquire(timeout=3)) #默認"blocking=True",因此可以設置值阻塞的超時時間,但當blocking=False時,timeout無法使用。
13 print(lock.acquire(blocking=False))14
15 print("main thread {}".format(threading.main_thread().ident))16 print("lock in main thread {}".format(lock))17
18 print("{0} 我是分割線 {0}".format("*" * 15))19
20 lock.release()21 lock.release()22 lock.release()23 lock.release()24 #lock.release() #由于上面鎖定的lock調用了4此鎖定,因此解鎖也只能是4次,若解鎖次數超過上鎖次數則拋出"RuntimeError: cannot release un-acquired lock"異常。
25
26 print("main thread {}".format(threading.main_thread().ident))27 print("lock in main thread {}".format(lock))28
29 print("{0} 我是分割線 {0}".format("*" * 15))30
31 print(lock.acquire(blocking=False))32 print(lock.acquire(blocking=False))33 print("main thread {}".format(threading.main_thread().ident))34 print("lock in main thread {}".format(lock))35
36 #threading.Thread(target=lambda l:l.release(),args=(lock,)).start() #可重入鎖不可跨越線程,否則會拋出"RuntimeError: cannot release un-acquired lock"異常。
37 lock.release() #可重入鎖無論是上鎖還是解鎖要求在同一個線程中。
38
39 time.sleep(3)40 print("main thread {}".format(threading.main_thread().ident))41 print("lock in main thread {}".format(lock))
True
True
True
True
main thread18096lockin main thread
*************** 我是分割線 ***************main thread18096lockin main thread
*************** 我是分割線 ***************True
True
main thread18096lockin main thread main thread18096lockin main thread
以上代碼執行結果戳這里
2>.為另一個線程傳入同一個RLock對象
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importthreading7 importtime8
9 lock =threading.RLock()10
11 defsub(l:threading.RLock):12 print("{}:{}".format(threading.current_thread(),l.acquire())) #阻塞
13 print("{}:{}".format(threading.current_thread(),l.acquire()))14 print("lock in sub thread {}".format(lock))15 l.release()16 print("release in sub 1")17 print("lock in sub thread {}".format(lock))18 l.release()19 print("release in sub 2")20 print("lock in sub thread {}".format(lock))21
22
23 print(lock.acquire())24 print("main thread {}".format(threading.main_thread().ident))25 print("lock in main thread {}".format(lock))26
27 print("{0} 我是分割線 {0}".format("*" * 15))28
29 threading.Timer(2,sub,(lock,)).start() #為另一個線程傳入同一個lock對象
30
31 print("in main thread, {}".format(lock.acquire()))32 lock.release()33 time.sleep(5)34 print("release lock in main thread =======",end="\n\n")35 lock.release()36 print("lock in main thread {}".format(lock))
True
main thread2456lockin main thread
*************** 我是分割線 ***************
inmain thread, True
release lockin main thread =======lockin main thread
:True:True
lockin sub thread releasein sub 1lockin sub thread releasein sub 2lockin sub thread
以上代碼執行結果戳這里
3>.可重入鎖相關總結
可重入鎖,是線程相關的鎖??稍谕粋€線程中獲取鎖,并可以繼續在同一線程不阻塞多次獲取鎖。
當鎖未釋放完,其它線程獲取鎖就會阻塞,直到當前持有鎖的線程釋放完鎖。
鎖都應該使用完后釋放。可重入鎖也是鎖,應該acquire多少次,就release多少次。
四.Condition
1>.Condition常用方法
Condition(lock=None):
可傳入一個Lock或者RLock對象,默認是RLock。
acquire(*args):
獲取鎖。
wait(self,timeout=None):
等待或超時。
notify(n=1):
喚醒至多指定數目個數的等待的線程,沒有等待的線程就沒有任何操作。
notify_all():
喚醒所有等待的線程。
2>.生產者消費者模型
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importlogging7 from threading importEvent,Thread,Condition8 importtime9 importrandom10
11 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
12 logging.basicConfig(format=FORMAT,level=logging.INFO)13
14 classDispachter:15 def __init__(self):16 self.data =None17 self.event = Event() #event只是為了使用方便,與邏輯無關
18 self.cond =Condition()19
20 defproduce(self,total):21 for _ inrange(total):22 data = random.randint(1,100)23 with self.cond:24 logging.info(data)25 self.data =data26 #self.cond.notify_all()
27 self.cond.notify(2)28 self.event.wait(1)29
30 defconsume(self):31 while notself.event.is_set():32 with self.cond:33 self.cond.wait()34 data =self.data35 logging.info("recieved {}".format(data))36 self.data =None37 self.event.wait(0.5)38
39 d =Dispachter()40 p = Thread(target=d.produce,name="producer",args=(10,))41
42 #增加消費者
43 for i in range(5):44 c = Thread(target=d.consume,name="consumer")45 c.start()46
47 p.start()
2019-11-25 22:24:45,076 producer 12228 64
2019-11-25 22:24:45,076 consumer 7612 recieved 64
2019-11-25 22:24:45,076 consumer 18400recieved None2019-11-25 22:24:46,077 producer 12228 41
2019-11-25 22:24:46,077 consumer 15008 recieved 41
2019-11-25 22:24:46,078 consumer 16440recieved None2019-11-25 22:24:47,077 producer 12228 98
2019-11-25 22:24:47,077 consumer 14832 recieved 98
2019-11-25 22:24:47,077 consumer 7612recieved None2019-11-25 22:24:48,077 producer 12228 39
2019-11-25 22:24:48,077 consumer 18400 recieved 39
2019-11-25 22:24:48,078 consumer 15008recieved None2019-11-25 22:24:49,078 producer 12228 79
2019-11-25 22:24:49,078 consumer 16440 recieved 79
2019-11-25 22:24:49,078 consumer 14832recieved None2019-11-25 22:24:50,079 producer 12228 39
2019-11-25 22:24:50,079 consumer 7612 recieved 39
2019-11-25 22:24:50,080 consumer 15008recieved None
......
以上代碼執行結果戳這里
3>.Condition總結
Condition用于生產者消費者模型中,解決生產者消費者速度匹配的問題。
采用了通知機制,非常有效率。
使用方式:
使用Condition,必須先acquire,用完了要release,因為內部使用了鎖,默認使用RLock,最好的方式是使用with上下文。
消費者這wait,等待通知。
生產者生產好消息,對消費者發通知,可以使用notify或者notify_all方法。
五.semaphore
1>.semaphore常用方法
和Lock很像,信號量對象內部維護一個倒計數器,每一次acquire都會減1,當acquire方法發現計數為0就阻塞請求的線程,直到其它線程對信號量release后,計數大于0,恢復阻塞的線程。換句話說,計數器永遠不會低于0,因為acquire的時候,發現是0,都會被阻塞。
semaphore常用方法如下:
Semaphore(value=1):
構造方法。value小于0,拋ValueError異常
acquire(blocking=True, timeout=None):
獲取信號量,計數器減1,獲取成功返回True
release():
釋放信號量,計數器加1
2>.基本使用案例(存在release方法超界限的問題)
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6
7 from threading importThread, Semaphore8 importlogging9 importtime10
11 FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'
12 logging.basicConfig(format=FORMAT, level=logging.INFO)13
14 defworker(s:Semaphore):15 logging.info("in worker thread")16 logging.info(s.acquire())17 logging.info('worker thread over')18
19
20 #定義信號量的個數為3
21 s = Semaphore(3)22 print(s.__dict__)23
24 logging.info(s.acquire()) #獲取一把鎖之后,"_value"計數器就會減1。
25
26
27 print(s.acquire(),s._value)28
29 print(s.acquire(),s._value)30
31 Thread(target=worker, name="worker",args=(s,)).start()32 time.sleep(2)33 logging.info(s.acquire(False))34 logging.info(s.acquire(timeout=3))35
36 #釋放一個
37 logging.info('release one')38 s.release()39 print(s.__dict__) #釋放鎖后可以被"worker"線程獲取
40
41 s.release()42 s.release()43 s.release()44 s.release()45 s.release() #此處我們可以故意多釋放幾次鎖
46
47 print(s.__dict__) #竟然內置計數器"_value"達到了6(也有可能是5,因為worker線程中需要獲取一把鎖),這樣實際上超出我們的最大值,需要解決這個問題。
2019-11-26 09:54:22,345 MainThread 140735817298880True2019-11-26 09:54:22,345 worker 123145401769984 inworker thread
{'_cond': , 0)>, '_value': 3}
True1True 02019-11-26 09:54:24,346 MainThread 140735817298880False2019-11-26 09:54:27,349 MainThread 140735817298880False2019-11-26 09:54:27,349 MainThread 140735817298880release one
{'_cond': , 0)>, '_value': 1}
{'_cond': , 0)>, '_value': 6}2019-11-26 09:54:27,350 worker 123145401769984True2019-11-26 09:54:27,350 worker 123145401769984 worker thread over
以上代碼執行結果戳這里
3>.BoundedSemaphore類(有邊界的信號量,不允許使用release超出初始化范圍,否則,拋出“ValueError: Semaphore released too many times”異常)
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6
7 from threading importThread, BoundedSemaphore8 importlogging9 importtime10
11 FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'
12 logging.basicConfig(format=FORMAT, level=logging.INFO)13
14 defworker(s:BoundedSemaphore):15 logging.info("in worker thread")16 logging.info(s.acquire())17 logging.info('worker thread over')18
19
20 #定義有邊界信號量的個數為3
21 s = BoundedSemaphore(3)22 print(s.__dict__)23
24 logging.info(s.acquire()) #獲取一把鎖之后,"_value"計數器就會減1。
25
26
27 print(s.acquire(),s._value)28
29 print(s.acquire(),s._value)30
31 Thread(target=worker, name="worker",args=(s,)).start()32 time.sleep(2)33 logging.info(s.acquire(False))34 logging.info(s.acquire(timeout=3))35
36 #釋放一個
37 logging.info('release one')38 s.release()39 print(s.__dict__) #釋放鎖后可以被"worker"線程獲取
40
41 s.release()42 s.release()43 s.release()44 s.release()45 s.release() #此處我們可以故意多釋放幾次鎖,一旦release超出初始值的范圍就拋出異常!
46
47 print(s.__dict__)
{'_cond': , 0)>, '_value': 3, '_initial_value': 3}
True1True 02019-11-26 10:09:17,632 MainThread 140735817298880True2019-11-26 10:09:17,635 worker 123145507561472 inworker thread2019-11-26 10:09:19,638 MainThread 140735817298880False
{'_cond': , 0)>, '_value': 1, '_initial_value': 3}2019-11-26 10:09:22,643 MainThread 140735817298880False2019-11-26 10:09:22,644 MainThread 140735817298880release one
Traceback (most recent call last):
File"/yinzhengjie/python/devops/python基礎/09.線程/04.信號量.py", line 43, in
2019-11-26 10:09:22,644 worker 123145507561472True2019-11-26 10:09:22,644 worker 123145507561472worker thread over
s.release()
File"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 483, inreleaseraise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times
以上代碼執行結果戳這里
4>.應用舉例(一個簡單的連接池)
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importrandom7 importthreading8 importlogging9 importtime10
11 FORMAT = '%(asctime)s %(threadName)s %(thread)-8d %(message)s'
12 logging.basicConfig(level=logging.INFO, format=FORMAT)13
14 classConn:15 def __init__(self, name):16 self.name =name17
18 """
19 連接池20 因為資源有限,且開啟一個連接成本高,所以,使用連接池。21
22 一個簡單的連接池23 連接池應該有容量(總數),有一個工廠方法可以獲取連接,能夠把不用的連接返回,供其他調用者使用。24
25 使用信號量解決資源有限的問題。26 如果池中有資源,請求者獲取資源時信號量減1,拿走資源。當請求超過資源數,請求者只能等待。當使用者用完歸還資源后信號量加1,等待線程就可以被喚醒拿走資源。27 注意:這個連接池的例子不能用到生成環境,只是為了說明信號量使用的例子,連接池還有很多未完成功能。28 """
29 classPool:30 def __init__(self, count:int):31 self.count =count32 #池中提前放著連接備用
33 self.pool = [self._connect('conn-{}'.format(i)) for i inrange(self.count)]34 self.semaphore =threading.Semaphore(self.count)35
36 def_connect(self, conn_name):37 #創建連接的方法,返回一個連接對象
38 returnConn(conn_name)39
40 defget_conn(self):41 #從池中拿走一個連接
42 logging.info('get~~~~~~~~~~~~~')43 self.semaphore.acquire()44 logging.info('-------------------------')45 returnself.pool.pop()46
47 defreturn_conn(self, conn: Conn):48 #向池中返回一個連接對象
49 logging.info('return~~~~~~~~~~~~~')50 self.pool.append(conn)51 self.semaphore.release()52
53 defworker(pool:Pool):54 conn =pool.get_conn()55 logging.info(conn)56 #模擬使用了一段時間
57 time.sleep(random.randint(1, 5))58 pool.return_conn(conn)59
60 #初始化連接池
61 pool = Pool(3)62
63 for i in range(6):64 threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()
2019-11-26 11:27:58,148 worker-0 123145324670976 get~~~~~~~~~~~~~
2019-11-26 11:27:58,148 worker-0 123145324670976 -------------------------
2019-11-26 11:27:58,148 worker-0 123145324670976 <__main__.Conn object at 0x102db0438>
2019-11-26 11:27:58,149 worker-1 123145329926144 get~~~~~~~~~~~~~
2019-11-26 11:27:58,149 worker-1 123145329926144 -------------------------
2019-11-26 11:27:58,149 worker-1 123145329926144 <__main__.Conn object at 0x102db03c8>
2019-11-26 11:27:58,149 worker-2 123145335181312 get~~~~~~~~~~~~~
2019-11-26 11:27:58,149 worker-2 123145335181312 -------------------------
2019-11-26 11:27:58,150 worker-2 123145335181312 <__main__.Conn object at 0x102db0240>
2019-11-26 11:27:58,150 worker-3 123145340436480 get~~~~~~~~~~~~~
2019-11-26 11:27:58,150 worker-4 123145345691648 get~~~~~~~~~~~~~
2019-11-26 11:27:58,151 worker-5 123145350946816 get~~~~~~~~~~~~~
2019-11-26 11:28:02,153 worker-0 123145324670976 return~~~~~~~~~~~~~
2019-11-26 11:28:02,153 worker-3 123145340436480 -------------------------
2019-11-26 11:28:02,154 worker-3 123145340436480 <__main__.Conn object at 0x102db0438>
2019-11-26 11:28:02,154 worker-1 123145329926144 return~~~~~~~~~~~~~
2019-11-26 11:28:02,154 worker-4 123145345691648 -------------------------
2019-11-26 11:28:02,154 worker-4 123145345691648 <__main__.Conn object at 0x102db03c8>
2019-11-26 11:28:03,154 worker-2 123145335181312 return~~~~~~~~~~~~~
2019-11-26 11:28:03,155 worker-5 123145350946816 -------------------------
2019-11-26 11:28:03,155 worker-5 123145350946816 <__main__.Conn object at 0x102db0240>
2019-11-26 11:28:05,155 worker-4 123145345691648 return~~~~~~~~~~~~~
2019-11-26 11:28:07,154 worker-3 123145340436480 return~~~~~~~~~~~~~
2019-11-26 11:28:08,159 worker-5 123145350946816 return~~~~~~~~~~~~~
以上代碼執行結果戳這里
5>.信號量和鎖
信號量:
可以多個線程訪問共享資源,但這個共享資源數量有限。
鎖:
可以看做特殊的信號量,即信號量計數器初值為1。只允許同一個時間一個線程獨占資源。
六.Queue的線程安全
1 #!/usr/bin/env python
2 #_*_conding:utf-8_*_
3 #@author :yinzhengjie
4 #blog:http://www.cnblogs.com/yinzhengjie
5
6 importqueue7
8
9 """
10 標準庫queue模塊,提供FIFO的Queue、LIFO的隊列、優先隊列。11
12 Queue類是線程安全的,適用于多線程間安全的交換數據。內部使用了Lock和Condition。13
14 為什么講魔術方法時,說實現容器的大小,不準確?15 1>.如果不加鎖,是不可能獲得準確的大小的,因為你剛讀取到了一個大小,還沒有取走數據,就有可能被其他線程改了。16 2>.Queue類的size雖然加了鎖,但是,依然不能保證立即get、put就能成功,因為讀取大小和get、put方法是分開的。17 """
18
19 q = queue.Queue(8)20
21 if q.qsize() == 7:22 q.put("abc") #上下兩句可能被打斷
23
24 if q.qsize() == 1:25 q.get() #未必會成功
總結
以上是生活随笔為你收集整理的python 线程同步_Python并发编程-线程同步(线程安全)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 战神zx8怎么进bios 战神zx8进入
- 下一篇: python做自动化测试的优点_乐搏讲自