基于.net的分布式系统限流组件
在互聯網應用中,流量洪峰是常有的事情。在應對流量洪峰時,通用的處理模式一般有排隊、限流,這樣可以非常直接有效的保護系統,防止系統被打爆。另外,通過限流技術手段,可以讓整個系統的運行更加平穩。今天要與大家分享一下限流算法和C#版本的組件。
一、令牌桶算法:
??? 令牌桶算法的基本過程如下:
???? 工作過程包括3個階段:產生令牌、消耗令牌和判斷數據包是否通過。其中涉及到2個參數:令牌產生的速率和令牌桶的大小,這個過程的具體工作如下。
???? 下面是C#的一個實現方式
class TokenBucketLimitingService: ILimitingService
??? {
??????? private LimitedQueue<object> limitedQueue = null;
??????? private CancellationTokenSource cancelToken;
??????? private Task task = null;
??????? private int maxTPS;
??????? private int limitSize;
??????? private object lckObj = new object();
??????? public TokenBucketLimitingService(int maxTPS, int limitSize)
??????? {
??????????? this.limitSize = limitSize;
??????????? this.maxTPS = maxTPS;
?????????? if (this.limitSize <= 0)
??????????????? this.limitSize = 100;
??????????? if(this.maxTPS <=0)
??????????????? this.maxTPS = 1;
?????????? limitedQueue = new LimitedQueue<object>(limitSize);
??????????? for (int i = 0; i < limitSize; i++)
??????????? {
??????????????? limitedQueue.Enqueue(new object());
??????????? }
??????????? cancelToken = new CancellationTokenSource();
??????????? task = Task.Factory.StartNew(new Action(TokenProcess), cancelToken.Token);
??????? }
?????? /// <summary>
??????? /// 定時消息令牌
??????? /// </summary>
??????? private void TokenProcess()
??????? {
??????????? int sleep = 1000 / maxTPS;
??????????? if (sleep == 0)
??????????????? sleep = 1;
?????????? DateTime start = DateTime.Now;
??????????? while (cancelToken.Token.IsCancellationRequested ==false)
??????????? {
??????????????? try
??????????????? {
??????????????????? lock (lckObj)
??????????????????? {
??????????????????????? limitedQueue.Enqueue(new object());
??????????????????? }
??????????????? }
??????????????? catch
??????????????? {
??????????????? }
??????????????? finally
??????????????? {
??????????????????? if (DateTime.Now - start < TimeSpan.FromMilliseconds(sleep))
??????????????????? {
??????????????????????? int newSleep = sleep - (int)(DateTime.Now - start).TotalMilliseconds;
??????????????????????? if (newSleep > 1)
??????????????????????????? Thread.Sleep(newSleep - 1); //做一下時間上的補償
??????????????????? }
??????????????????? start = DateTime.Now;
??????????????? }
??????????? }
??????? }
?????? public void Dispose()
??????? {
??????????? cancelToken.Cancel();
??????? }
?????? /// <summary>
??????? /// 請求令牌
??????? /// </summary>
??????? /// <returns>true:獲取成功,false:獲取失敗</returns>
??????? public bool Request()
??????? {
??????????? if (limitedQueue.Count <= 0)
??????????????? return false;
??????????? lock (lckObj)
??????????? {
??????????????? if (limitedQueue.Count <= 0)
??????????????????? return false;
?????????????? object data = limitedQueue.Dequeue();
??????????????? if (data == null)
??????????????????? return false;
??????????? }
?????????? return true;
??????? }
??? }
public interface ILimitingService:IDisposable
???? {
???????? /// <summary>
???????? /// 申請流量處理
???????? /// </summary>
???????? /// <returns>true:獲取成功,false:獲取失敗</returns>
???????? bool Request();
???? }
public class LimitingFactory
???? {
???????? /// <summary>
???????? /// 創建限流服務對象
???????? /// </summary>
???????? /// <param name="limitingType">限流模型</param>
???????? /// <param name="maxQPS">最大QPS</param>
???????? /// <param name="limitSize">最大可用票據數</param>
???????? public static ILimitingService Build(LimitingType limitingType = LimitingType.TokenBucket, int maxQPS = 100, int limitSize = 100)
???????? {
???????????? switch (limitingType)
???????????? {
???????????????? case LimitingType.TokenBucket:
???????????????? default:
???????????????????? return new TokenBucketLimitingService(maxQPS, limitSize);
???????????????? case LimitingType.LeakageBucket:
???????????????????? return new LeakageBucketLimitingService(maxQPS, limitSize);
???????????? }
???????? }
???? }
?? /// <summary>
???? /// 限流模式
???? /// </summary>
???? public enum LimitingType
???? {
???????? TokenBucket,//令牌桶模式
???????? LeakageBucket//漏桶模式
???? }
public class LimitedQueue<T> : Queue<T>
???? {
???????? private int limit = 0;
???????? public const string QueueFulled = "TTP-StreamLimiting-1001";
??????? public int Limit
???????? {
???????????? get { return limit; }
???????????? set { limit = value; }
???????? }
??????? public LimitedQueue()
???????????? : this(0)
???????? { }
??????? public LimitedQueue(int limit)
???????????? : base(limit)
???????? {
???????????? this.Limit = limit;
???????? }
??????? public new bool Enqueue(T item)
???????? {
???????????? if (limit > 0 && this.Count >= this.Limit)
???????????? {
???????????????? return false;
???????????? }
???????????? base.Enqueue(item);
???????????? return true;
???????? }
???? }
???? 調用方法:
var service = LimitingFactory.Build(LimitingType.TokenBucket, 500, 200);
while (true)
{
????? var result = service.Request();
?????? //如果返回true,說明可以進行業務處理,否則需要繼續等待
?????? if (result)
?????? {
???????????? //業務處理......
?????? }
?????? else
???????????? Thread.Sleep(1);
}
二、漏桶算法
????? 聲明一個固定容量的桶,每接受到一個請求向桶中添加一個令牌,當令牌桶達到上線后請求丟棄或等待,具體算法如下:
???? 工作過程也包括3個階段:產生令牌、消耗令牌和判斷數據包是否通過。其中涉及到2個參數:令牌自動消費的速率和令牌桶的大小,個過程的具體工作如下。
??? C#的一個實現方式:
class LeakageBucketLimitingService: ILimitingService
???? {
???????? private LimitedQueue<object> limitedQueue = null;
???????? private CancellationTokenSource cancelToken;
???????? private Task task = null;
???????? private int maxTPS;
???????? private int limitSize;
???????? private object lckObj = new object();
???????? public LeakageBucketLimitingService(int maxTPS, int limitSize)
???????? {
???????????? this.limitSize = limitSize;
???????????? this.maxTPS = maxTPS;
??????????? if (this.limitSize <= 0)
???????????????? this.limitSize = 100;
???????????? if (this.maxTPS <= 0)
???????????????? this.maxTPS = 1;
??????????? limitedQueue = new LimitedQueue<object>(limitSize);
???????????? cancelToken = new CancellationTokenSource();
???????????? task = Task.Factory.StartNew(new Action(TokenProcess), cancelToken.Token);
???????? }
??????? private void TokenProcess()
???????? {
???????????? int sleep = 1000 / maxTPS;
???????????? if (sleep == 0)
???????????????? sleep = 1;
??????????? DateTime start = DateTime.Now;
???????????? while (cancelToken.Token.IsCancellationRequested == false)
???????????? {
???????????????? try
???????????????? {
??????????????????? if (limitedQueue.Count > 0)
???????????????????? {
???????????????????????? lock (lckObj)
???????????????????????? {
???????????????????????????? if (limitedQueue.Count > 0)
???????????????????????????????? limitedQueue.Dequeue();
???????????????????????? }
???????????????????? }
???????????????? }
???????????????? catch
???????????????? {
???????????????? }
???????????????? finally
???????????????? {
???????????????????? if (DateTime.Now - start < TimeSpan.FromMilliseconds(sleep))
???????????????????? {
???????????????????????? int newSleep = sleep - (int)(DateTime.Now - start).TotalMilliseconds;
???????????????????????? if (newSleep > 1)
???????????????????????????? Thread.Sleep(newSleep - 1); //做一下時間上的補償
???????????????????? }
???????????????????? start = DateTime.Now;
???????????????? }
???????????? }
???????? }
??????? public void Dispose()
???????? {
???????????? cancelToken.Cancel();
???????? }
??????? public bool Request()
???????? {
???????????? if (limitedQueue.Count >= limitSize)
???????????????? return false;
???????????? lock (lckObj)
???????????? {
???????????????? if (limitedQueue.Count >= limitSize)
???????????????????? return false;
??????????????? return limitedQueue.Enqueue(new object());
???????????? }
???????? }
???? }
??? 調用方法:
var service = LimitingFactory.Build(LimitingType.LeakageBucket, 500, 200);
while (true)
{
????? var result = service.Request();
?????? //如果返回true,說明可以進行業務處理,否則需要繼續等待
????? if (result)
?????? {
???????????? //業務處理......
?????? }
?????? else
??????????? Thread.Sleep(1);
}
兩類限流算法雖然非常相似,但是還是有些區別的,供大家參考!
總結
以上是生活随笔為你收集整理的基于.net的分布式系统限流组件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 凸透镜发散还是会聚(凹透镜是发散还是会聚
- 下一篇: ElasticSearch入门 :Win