[Python 多线程] Semaphore、BounedeSemaphore (十二)
?
?Semaphore
信號量,信號量對象內部維護一個倒計數器,每一次acquire都會減1,當acquire方法發現計數為0就阻塞請求的線程,直到其它線程對信號量release后,計數大于0,恢復阻塞的線程。
?
方法:
Semaphore(value=1)? ? ? ? ? ? ? ? ? ? ? ? ? ? 構造方法。value小于0,拋ValueError異常。默認為1。
acquire(blocking=True,timeout=None)? 獲取信號量,計數器減1,獲取成功返回True。
release()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?釋放信號量,計數器加1。
?
計數器永遠不會低于0,因為acquire的時候,發現是0,都會被阻塞。
?
?舉例:
圖書館有三本書,三本都被借走(acquire)之后,其他人想看,就得等別人還回來(阻塞),有人還回來(release)一本后,就有一個人可以拿到這本書,其他人仍然得等歸還。
#Semaphore 信號量,借還 import threading,logging,time DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)def work(s:threading.Semaphore):logging.info('in sub thread')logging.info(s.acquire())logging.info('sub thread oevr')s = threading.Semaphore(3) logging.info(s.acquire()) logging.info(s.acquire()) logging.info(s.acquire())threading.Thread(target=work,args=(s,)).start() time.sleep(2)logging.info(s.acquire(False)) #不阻塞 logging.info((s.acquire(timeout=3))) #3秒超時會阻塞logging.info('release') s.release()運行結果: [08:48:43] [MainThread,8840] True [08:48:43] [MainThread,8840] True [08:48:43] [MainThread,8840] True [08:48:43] [Thread-1,6212] in sub thread [08:48:45] [MainThread,8840] False [08:48:48] [MainThread,8840] False [08:48:48] [MainThread,8840] release [08:48:48] [Thread-1,6212] True [08:48:48] [Thread-1,6212] sub thread oevr這個例子只起了一個線程,如果多起幾個,當release還回來的數小于阻塞的線程數時,程序就會一直處于阻塞狀態,直到全部relase。
?
?應用舉例:
因為資源有限,且開啟一個連接成本高,所以,使用連接池。
?
一個簡單的連接池(例子):
連接池應該有容量(value總數),也應該工廠方法可以獲取連接,能夠把不用的連接歸還,供其他使用者使用。
#一個簡單的連接池 import threading,logging,time DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)class Conn:def __init__(self,name):self.name = nameclass Pool:def __init__(self,count=3):self.count = count#連接池容器self.pool = [self._connect('conn-{}'.format(x)) for x in range(self.count)]def _connect(self,conn_name):return Conn(conn_name)def get_conn(self):# if len(self.pool) > 0:return self.pool.pop() #從尾部彈出一個def return_conn(self,conn:Conn):self.pool.append(conn)pool = Pool(3) print(pool.pool) pool.get_conn() pool.get_conn() pool.get_conn() pool.get_conn() #第4個print('End Main')運行結果: [<__main__.Conn object at 0x00000211BBEBC160>, <__main__.Conn object at 0x00000211BBEBC1D0>, <__main__.Conn object at 0x00000211BBEBC240>] Traceback (most recent call last):File "C:/python/test.py", line 34, in <module>pool.get_conn()File "C:/python/test.py", line 24, in get_connreturn self.pool.pop() #從尾部彈出一個 IndexError: pop from empty list當連接池中已經沒有可用連接時,再獲取就會拋異常 IndexError:pop from empty list。
那就加個判斷,只在池中連接數量大于0的時候才可以獲取連接:
#修改get_conn函數def get_conn(self):if len(self.pool) > 0:return self.pool.pop() #從尾部彈出一個這樣在連接池為空時,就不會拋異常了。
?
這個連接池的例子如果使用多線程,這個get_conn()方法是線程不安全的,有可能其它線程看到池中還有一個連接,正準備獲取,其它線程也看到了,也準備獲取連接,就會拋異常。再或者,都在向池中加連接的時候,也可能會多加。
這個問題可以用鎖Lock來解決, 在獲取連接和加連接時,加鎖解鎖;也可以使用semaphore信號量來解決。
?
使用信號量對上例進行修改:
#使用semaphore信號量修改連接池 import threading,logging,time,random DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)class Conn:def __init__(self,name):self.name = namedef __repr__(self):return self.nameclass Pool:def __init__(self,count=3):self.count = count#連接池容器self.pool = [self._connect('conn-{}'.format(x)) for x in range(self.count)]self.semaphore = threading.Semaphore(self.count)def _connect(self,conn_name):#返回一個連接名return Conn(conn_name)def get_conn(self):#從池中拿走一個連接# if len(self.pool) > 0:self.semaphore.acquire(timeout=5) #-1,獲取連接,最大5秒超時時間,與后面隨機秒數相對應data = self.pool.pop() #從尾部彈出一個return datadef return_conn(self,conn:Conn):#向池中添加一個連接self.pool.append(conn)self.semaphore.release() # 先加入池中再信號量+1return len(self.pool)pool = Pool(3)def worker(pool:Pool):conn = pool.get_conn()logging.info(conn)#模擬使用了資源一段時間(隨機1-4秒),然后歸還threading.Event().wait(timeout=random.randint(1,4))pool.return_conn(conn)for i in range(6):threading.Thread(target=worker,name="worker-{}".format(i),args=(pool,)).start()print('End Main')運行結果: [10:34:12] [worker-0,5264] conn-2 [10:34:12] [worker-1,7420] conn-1 [10:34:12] [worker-2,2612] conn-0 End Main [10:34:13] [worker-3,3972] conn-1 #歸還以后又可以獲取連接 [10:34:14] [worker-4,8172] conn-2 [10:34:15] [worker-5,11192] conn-1上例中模擬獲取連接以后使用了1-4秒鐘,沒有拿到資源的最多阻塞5秒鐘,當連接使用結束歸還后,阻塞的線程就又重新獲取到連接。
?
?
問題:
1) 沒有使用信號量就release的情況:
import threadings = threading.Semaphore(3) print(s.__dict__)def work(s:threading.Semaphore):s.release()for i in range(3):threading.Thread(target=work,args=(s,)).start()print(s.__dict__)運行結果: {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 3} {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 2} {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 3} {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 4} {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 5}沒有acquire信號量時,就release的情況,結果導致了信號量的內置倒計數器的值增加,這樣就超出了最大值。
?
解決辦法:
使用BoundedSemaphore類:
BoundedSemaphore,繼承自Semaphore類。邊界綁定,有界的信號量,不允許使用release超過初始值的范圍,否則,拋ValueError異常。
#BoundedSemaphore邊界綁定 import threadings = threading.BoundedSemaphore(3) print(s.__dict__)s.acquire() print(s.__dict__)def work(s:threading.BoundedSemaphore):s.release()for i in range(3):threading.Thread(target=work,args=(s,)).start()print(s.__dict__)運行結果: {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3} {'_value': 2, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3} {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3} {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3} {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3} Exception in thread Thread-2: Traceback (most recent call last):File "C:/python/test.py", line 11, in works.release() ValueError: Semaphore released too many times使用BoundedSemaphore就可以控制不會多歸還。
?
轉載于:https://www.cnblogs.com/i-honey/p/8078518.html
總結
以上是生活随笔為你收集整理的[Python 多线程] Semaphore、BounedeSemaphore (十二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: selenium webdrive 默认
- 下一篇: C# ManualResetEvent