服务接口API限流 Rate Limit
轉載:https://www.cnblogs.com/exceptioneye/p/4783904.html
? ? ? ? ? ?https://blog.csdn.net/zrg523/article/details/82185088?utm_source=blogkpcl2(實施方案)
?
一、場景描述 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ?很多做服務接口的人或多或少的遇到這樣的場景,由于業務應用系統的負載能力有限,為了防止非預期的請求對系統壓力過大而拖垮業務應用系統。
? ? 也就是面對大流量時,如何進行流量控制?
? ? 服務接口的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然降低了服務接口的訪問頻率和并發量,卻換取服務接口和業務應用系統的高可用。
? ? ?實際場景中常用的限流策略:
- Nginx前端限流
? ? ? ? ?按照一定的規則如帳號、IP、系統調用邏輯等在Nginx層面做限流
- 業務應用系統限流
? ? ? ? 1、客戶端限流
? ? ? ? 2、服務端限流
- 數據庫限流
? ? ? ? 紅線區,力保數據庫
二、常用的限流算法 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
? ? ?常用的限流算法由:樓桶算法和令牌桶算法。本文不具體的詳細說明兩種算法的原理,原理會在接下來的文章中做說明。
? ? ?1、漏桶算法
? ? ? ? ?漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然后就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率.示意圖如下:
? ? ? ? ?可見這里有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。
? ? ? ? ?因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對于存在突發特性的流量來說缺乏效率.
? ? ?2、令牌桶算法
? ? ? ? ?令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨著時間流逝,系統會按恒定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.
?
令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.
三、基于Redis功能的實現 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ?簡陋的設計思路:假設一個用戶(用IP判斷)每分鐘訪問某一個服務接口的次數不能超過10次,那么我們可以在Redis中創建一個鍵,并此時我們就設置鍵的過期時間為60秒,每一個用戶對此服務接口的訪問就把鍵值加1,在60秒內當鍵值增加到10的時候,就禁止訪問服務接口。在某種場景中添加訪問時間間隔還是很有必要的。
? ? ? 1)使用Redis的incr命令,將計數器作為Lua腳本? ? ? ? ?
1 local current 2 current = redis.call("incr",KEYS[1]) 3 if tonumber(current) == 1 then 4 redis.call("expire",KEYS[1],1) 5 end? ? ? ? Lua腳本在Redis中運行,保證了incr和expire兩個操作的原子性。
? ? ? ?2)使用Reids的列表結構代替incr命令
1 FUNCTION LIMIT_API_CALL(ip)2 current = LLEN(ip)3 IF current > 10 THEN4 ERROR "too many requests per second"5 ELSE6 IF EXISTS(ip) == FALSE7 MULTI8 RPUSH(ip,ip)9 EXPIRE(ip,1) 10 EXEC 11 ELSE 12 RPUSHX(ip,ip) 13 END 14 PERFORM_API_CALL() 15 END? ? ? ? ?Rate Limit使用Redis的列表作為容器,LLEN用于對訪問次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個命令,用于在第一次執行計數是創建列表并設置過期時間,
? ? RPUSHX在后續的計數操作中進行增加操作。
四、基于令牌桶算法的實現 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ?令牌桶算法可以很好的支撐突然額流量的變化即滿令牌桶數的峰值。
? ? ??
1 import java.io.BufferedWriter;2 import java.io.FileOutputStream;3 import java.io.IOException;4 import java.io.OutputStreamWriter;5 import java.util.Random;6 import java.util.concurrent.ArrayBlockingQueue;7 import java.util.concurrent.Executors;8 import java.util.concurrent.ScheduledExecutorService;9 import java.util.concurrent.TimeUnit;10 import java.util.concurrent.locks.ReentrantLock;11 12 import com.google.common.base.Preconditions;13 import com.netease.datastream.util.framework.LifeCycle;14 1520 public class TokenBucket implements LifeCycle {21 22 // 默認桶大小個數 即最大瞬間流量是64M23 private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;24 25 // 一個桶的單位是1字節26 private int everyTokenSize = 1;27 28 // 瞬間最大流量29 private int maxFlowRate;30 31 // 平均流量32 private int avgFlowRate;33 34 // 隊列來緩存桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 6435 private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);36 37 private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();38 39 private volatile boolean isStart = false;40 41 private ReentrantLock lock = new ReentrantLock(true);42 43 private static final byte A_CHAR = 'a';44 45 public TokenBucket() {46 }47 48 public TokenBucket(int maxFlowRate, int avgFlowRate) {49 this.maxFlowRate = maxFlowRate;50 this.avgFlowRate = avgFlowRate;51 }52 53 public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {54 this.everyTokenSize = everyTokenSize;55 this.maxFlowRate = maxFlowRate;56 this.avgFlowRate = avgFlowRate;57 }58 59 public void addTokens(Integer tokenNum) {60 61 // 若是桶已經滿了,就不再家如新的令牌62 for (int i = 0; i < tokenNum; i++) {63 tokenQueue.offer(Byte.valueOf(A_CHAR));64 }65 }66 67 public TokenBucket build() {68 69 start();70 return this;71 }72 73 /**74 * 獲取足夠的令牌個數75 *76 * @return77 */78 public boolean getTokens(byte[] dataSize) {79 80 Preconditions.checkNotNull(dataSize);81 Preconditions.checkArgument(isStart, "please invoke start method first !");82 83 int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數84 85 final ReentrantLock lock = this.lock;86 lock.lock();87 try {88 boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量89 if (!result) {90 return false;91 }92 93 int tokenCount = 0;94 for (int i = 0; i < needTokenNum; i++) {95 Byte poll = tokenQueue.poll();96 if (poll != null) {97 tokenCount++;98 }99 } 100 101 return tokenCount == needTokenNum; 102 } finally { 103 lock.unlock(); 104 } 105 } 106 107 @Override 108 public void start() { 109 110 // 初始化桶隊列大小 111 if (maxFlowRate != 0) { 112 tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate); 113 } 114 115 // 初始化令牌生產者 116 TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this); 117 scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS); 118 isStart = true; 119 120 } 121 122 @Override 123 public void stop() { 124 isStart = false; 125 scheduledExecutorService.shutdown(); 126 } 127 128 @Override 129 public boolean isStarted() { 130 return isStart; 131 } 132 133 class TokenProducer implements Runnable { 134 135 private int avgFlowRate; 136 private TokenBucket tokenBucket; 137 138 public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) { 139 this.avgFlowRate = avgFlowRate; 140 this.tokenBucket = tokenBucket; 141 } 142 143 @Override 144 public void run() { 145 tokenBucket.addTokens(avgFlowRate); 146 } 147 } 148 149 public static TokenBucket newBuilder() { 150 return new TokenBucket(); 151 } 152 153 public TokenBucket everyTokenSize(int everyTokenSize) { 154 this.everyTokenSize = everyTokenSize; 155 return this; 156 } 157 158 public TokenBucket maxFlowRate(int maxFlowRate) { 159 this.maxFlowRate = maxFlowRate; 160 return this; 161 } 162 163 public TokenBucket avgFlowRate(int avgFlowRate) { 164 this.avgFlowRate = avgFlowRate; 165 return this; 166 } 167 168 private String stringCopy(String data, int copyNum) { 169 170 StringBuilder sbuilder = new StringBuilder(data.length() * copyNum); 171 172 for (int i = 0; i < copyNum; i++) { 173 sbuilder.append(data); 174 } 175 176 return sbuilder.toString(); 177 178 } 179 180 public static void main(String[] args) throws IOException, InterruptedException { 181 182 tokenTest(); 183 } 184 185 private static void arrayTest() { 186 ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10); 187 tokenQueue.offer(1); 188 tokenQueue.offer(1); 189 tokenQueue.offer(1); 190 System.out.println(tokenQueue.size()); 191 System.out.println(tokenQueue.remainingCapacity()); 192 } 193 194 private static void tokenTest() throws InterruptedException, IOException { 195 TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build(); 196 197 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test"))); 198 String data = "xxxx";// 四個字節 199 for (int i = 1; i <= 1000; i++) { 200 201 Random random = new Random(); 202 int i1 = random.nextInt(100); 203 boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes()); 204 TimeUnit.MILLISECONDS.sleep(100); 205 if (tokens) { 206 bufferedWriter.write("token pass --- index:" + i1); 207 System.out.println("token pass --- index:" + i1); 208 } else { 209 bufferedWriter.write("token rejuect --- index" + i1); 210 System.out.println("token rejuect --- index" + i1); 211 } 212 213 bufferedWriter.newLine(); 214 bufferedWriter.flush(); 215 } 216 217 bufferedWriter.close(); 218 } 219 220 }?
?
?
參考:
http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/
http://redisdoc.com/string/incr.html
http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html
總結
以上是生活随笔為你收集整理的服务接口API限流 Rate Limit的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【气象科普】降水的等级划分
- 下一篇: 【win10系统IIS服务器配置】