python redis事务_python实现redis三种cas事务操作
cas全稱是compare and set,是一種典型的事務操作。
簡單的說,事務就是為了存取數據庫中同一數據時不破壞操作的隔離性和原子性,從而保證數據的一致性。
一般數據庫,比如MySql是如何保證數據一致性的呢,主要是加鎖,悲觀鎖。比如在訪問數據庫某條數據的時候,會用SELECT FOR UPDATE ,這MySql就會對這條數據進行加鎖,直到事務被提交(COMMIT),或者回滾(ROLLBACK)。如果此時,有其他事務對被加鎖的數據進行寫入,那么該事務將會被阻塞,直到第一個事務完成為止。它的缺點在于:持有鎖的事務運行越慢,等待解鎖的事務阻塞時間就越長。并且容易產生死鎖(前面有篇文章有講解死鎖)!
本文會介紹三種redis實現cas事務的方法,并會解決下面的虛擬問題:
維護一個值,如果這個值小于當前時間,則設置為當前時間;如果這個值大于當前時間,則設置為當前時間+30。簡單的單線程環境下代碼如下:
# 初始化
r = redis.Redis()
if not r.exists("key_test"):
r.set("key_test", 0)
def inc():
count = int(r.get('key_test')) + 30 #1
# 如果值比當前時間小,則設置為當前時間
count = max(count, int(time.time())) #2
r.set('key_test', count) #3
return count
很簡單的一段代碼,在單線程環境下可以跑的很歡,但顯然,是無法移植到多線程或者是多進程環境的(進程A和B同時運行到#1,獲取了相同的count值,然后運行#2#3,會導致count值總共只增加了30)。而為了能在多進程環境下運行,我們需要引入一些其他的東西。
py-redis本身自帶的事務操作
redis有這么幾個和事務相關的命令,multi,exec,watch。通過這幾個命令,可以實現‘將多個命令打包,然后一次性、按順序執行,且不會被終端'。事務會從MULTI開始,執行EXEC后觸發事件。另外,我們還需要WATCH,watch可以監視任意數量的鍵,當在調用EXEC執行事務時,如果任意一個鍵被修改了,整個事務不會執行。
下邊是使用redis本身的事務解決cas問題的代碼。
class CasNormal(object):
def __init__(self, host, key):
self.r = redis.Redis(host)
self.key = key
if not self.r.exists(self.key):
self.r.set(self.key, 0)
def inc(self):
with self.r.pipeline() as pipe:
while True:
try:
#監視一個key,如果在執行期間被修改了,會拋出WatchError
pipe.watch(self.key)
next_count = 30 + int(pipe.get(self.key))
pipe.multi()
if next_count < int(time.time()):
next_count = int(time.time())
pipe.set(self.key, next_count)
pipe.execute()
return next_count
except WatchError:
continue
finally:
pipe.reset()
代碼也不復雜,引入了之前說到的multi,exec,watch,如果對事務操作比較熟悉的同學,可以很容易看出來,這是一個樂觀鎖的操作(咱們假設沒人競爭來著,每次去拿數據的時候都不會上鎖,真有人來改了再說。)樂觀鎖在高并發的情況下會顯得很無力,文末的性能對比會顯示這個問題。
使用基于redis的悲觀鎖
悲觀鎖,就是很悲觀的鎖,每次拿數據都會假設別人也要拿,先給鎖起來,用完再把鎖釋放掉。redis本身沒有實現悲觀鎖,但我們可以先用redis實現一個悲觀鎖。
ok,咱們現在有悲觀鎖了,做起事來也有底氣了,根據上邊的代碼,咱們只要加上@ synchronized注釋就能保證同一時間只有一個進程在執行。下邊是基于悲觀鎖的解決方案。
lock_conn = redis.Redis("localhost")
class CasLock(object):
def __init__(self, host, key):
self.r = redis.Redis(host)
self.key = key
if not self.r.exists(self.key):
self.r.set(self.key, 0)
@synchronized(lock_conn, "lock", 10)
def inc(self):
next_count = 30 + int(self.r.get(self.key))
if next_count < int(time.time()):
next_count = int(time.time())
self.r.set(self.key, next_count)
return next_count
代碼看上去少多了(因為引入了synchronized...)
基于lua腳本實現
上邊兩種方法都是用鎖來實現的,鎖的實現總會出現競爭的問題,區別無非是出現競爭了咋辦的問題。使用redis lua腳本的實現,可以直接把這個cas操作當成一個原子操作。
我們知道,redis本身的一系列操作,都是原子操作,且redis會按順序執行所有收到的命令。先看代碼
class CasLua(object):
def __init__(self, host, key):
self.r = redis.Redis(host)
self.key = key
if not self.r.exists(self.key):
self.r.set(self.key, 0)
self._lua = self.r.register_script("""
local next_count = redis.call('get',KEYS[1]) + ARGV[1]
ARGV[2] = tonumber(ARGV[2])
if next_count < ARGV[2] then
next_count = ARGV[2]
end
redis.call('set',KEYS[1],next_count)
return tostring(next_count)
""")
def inc(self):
return int(self._lua([self.key], [30, int(time.time())]))
這里先注冊了這個腳本,后邊可以直接去使用他。關于redis lua腳本的文章有不少,感興趣的可以去搜搜看,這邊就不贅述了。
性能對比
這邊的測試只是一個非常簡單的測試(不過還是能看出效果來的),測試換機就是自己的開發機,數字看個大小就行了。
分別測了三種操作在單線程,五個線程,十個線程,五十個線程情況下,進行1000次操作各自的表現,時間如下
optimistic Lock pessimistic lock lua
1thread 0.43 0.71 0.35
5thread 5.80 3.10 0.62
10thread 17.80 5.60 1.30
50thread 245.00 29.60 6.50
依次是redis本身事務實現的樂觀鎖,基于redis實現的悲觀鎖以及lua實現。
在比較悲觀鎖和樂觀鎖之前,需要先說明一點,這邊的測試對樂觀鎖不是很公平,樂觀鎖本身就是假設不會有很多的并發的。在單線程情況下,悲觀鎖要差一些。單線程下,不存在競爭關系,悲觀鎖耗時長僅因為是多了一次redis的網絡交互。隨著線程的增加,悲觀鎖的性能逐漸變好,畢竟悲觀鎖本身就是為了解決這種高并發高競爭的環境而誕生的。在50線程的時候,樂觀鎖的實現單次操作的時間要0.245秒,非常恐怖,如果是生產環境,幾乎都不能用了。
至于lua的性能,快的不可思議,幾乎就是線性增加。(50線程的情況下,平均的1000次完成時間是6.5s,換言之,6.5秒內執行了50 * 1000次cas操作)。
以上測試都是本地redis,本地測試,如果redis是遠端的,網絡交互時間會增加,lua優勢會更加明顯。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
總結
以上是生活随笔為你收集整理的python redis事务_python实现redis三种cas事务操作的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 运行iDT算法代码及后续特征编码
- 下一篇: STM32----ADC多通道采集
