C# Redis分布式锁(RedLock)
Redis單節點的分布式鎖只需要注意三點就可以了:
1.加鎖并設置鎖的過期時間必須是原子操作;
2.鎖的value值必須要有唯一性;
3.釋放鎖的時候要驗證其value值,不是自己加的鎖不能釋放.
但是單節點分布式鎖最大的缺點就是,它只作用在一個Redis節點上,如果該節點掛了,那就掛了.
那可不可以通過哨兵機制來保證高可用呢?
答案是不行.
因為Redis在進行主從復制的時候是異步的.
假設 clientA 拿到鎖后,在 master 還沒同步到 slave 時,master 發生了故障,這時候 salve 升級為 master,導致鎖丟失.
RedLock 的思想是:假設有5個Redis節點.這些節點完全相互獨立,不存在主從或者集群機制,都是 master.并且這5個Redis實例運行在5臺機器上,這樣保證他們不會同時宕掉.
客戶端應該按照以下操作來獲取鎖:
1.獲取當前時間戳,假設是T1.
2.依次嘗試從這5個Redis實例獲取鎖.當客戶端向Redis請求獲取鎖時,客戶端應該設置超時時間,并且這個超時時間應該小于鎖的失效時間.比如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間.這樣可以避免Redis已經掛掉的情況下,客戶端還在等待響應結果.如果Redis沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis實例請求獲取鎖.
3.請求完所有的Redis節點后,只有滿足如下兩點,才算真正的獲取到鎖:
1)當前時間 - T1 的時間差小于鎖的過期時間.比如T1=00:00:00,然后從5個Redis節點都拿到了鎖,當前時間是 00:00:05,也就是說獲取鎖一共用了5秒鐘.假設鎖的過期時間是3秒,那么這次獲取鎖的操作就算失敗了.
2)從(N/2+1)個Redis節點都獲取到鎖.這個很好理解,5個節點,你拿2個,我拿2個,到底算誰的?
總結一句話就是:從開始獲取鎖計時,只要在鎖的過期時間內成功獲取到一半以上的鎖便算成功,否則算失敗.
4.當客戶端獲取到了鎖,鎖的真正有效時間 = 鎖的過期時間 - 獲取鎖所使用的時間(也就是第3步計算出來的時間).
5.如果客戶端由于某些原因(比如獲取鎖的實例個數小于N/2+1,或者已經超過了有效時間),沒有獲取到鎖,客戶端便會在所有的Redis實例上進行解鎖(即使某些Redis實例根本就沒有加鎖成功),因為可能已經獲取了小于 N/2+1個鎖,必須釋放掉,否則會影響其他客戶端獲取鎖.
關于是否啟動AOF永久存儲,需要有所取舍.
1.永久啟動,由于Redis的過期機制是按照unix時間戳走的,所以當我們重啟Redis后,依然會按照規定的時間過期.但是永久啟動對性能有一定影響;
2.采用默認的1秒1次.如果在1秒內斷電,會導致數據丟失,這時候如果立刻重啟會導致鎖的互斥性實效.
所以有效的解決方案是,采用AOF,1秒1次,不管什么原因宕機后,等待一定時間再重啟.這個時間就是鎖的過期時間.
Demo:
安裝官方提供的 RedLock.net
Startup:
public class Startup
{
private RedLockFactory _redLockFactory;
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
var endPoints = new List<RedLockEndPoint>
{
new DnsEndPoint("127.0.0.1", 6379),
new DnsEndPoint("127.0.0.1", 6380),
new DnsEndPoint("127.0.0.1", 6381)
};
_redLockFactory = RedLockFactory.Create(endPoints);
services.AddSingleton(typeof(IDistributedLockFactory), _redLockFactory);
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime applicationLifetime)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
//應用程序結束時釋放,因為不是容器創建的對象
applicationLifetime.ApplicationStopping.Register(() =>
{
_redLockFactory.Dispose();
});
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
測試api:
[ApiController]
public class ValuesController : ControllerBase
{
private static int _stock = 10;
private readonly IDistributedLockFactory _distributedLockFactory;
public ValuesController(IDistributedLockFactory distributedLockFactory)
{
_distributedLockFactory = distributedLockFactory;
}
[Route("lockTest")]
[HttpGet]
public async Task<int> DistributedLockTest()
{
// resource 鎖定的資源
var resource = "the-thing-we-are-locking-on";
// expiryTime 鎖的過期時間
var expiry = TimeSpan.FromSeconds(5);
// waitTime 等待時間
var wait = TimeSpan.FromSeconds(1);
// retryTime 等待時間內,多久重試一次
var retry = TimeSpan.FromMilliseconds(250);
using (var redLock = await _distributedLockFactory.CreateLockAsync(resource, expiry, wait, retry))
{
if (redLock.IsAcquired)
{
// 模擬執行業務邏輯
await Task.Delay(new Random().Next(100, 500));
if (stock > 0)
{
stock--;
return stock;
}
return stock;
}
Console.WriteLine($"{DateTime.Now} : 獲取鎖失敗");
}
return -99;
}
}
測試控制臺:
static void Main(string[] args)
{
HttpClient client = new HttpClient();
var result = Parallel.For(0, 20, (i) =>
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var response = client.GetAsync($"http://localhost:5000/locktest").Result;
stopwatch.Stop();
var data = response.Content.ReadAsStringAsync().Result;
Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}");
});
client.Dispose();
Console.ReadKey();
}
測試結果:
總結
以上是生活随笔為你收集整理的C# Redis分布式锁(RedLock)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 孤独的英文网名72个
- 下一篇: 图像处理之生成ColorBar