來源:公眾號【 java進階架構師】
好文推薦:
字節跳動Java崗4面面經分享:索弓|+rabbitmq+spring+Redis
拼多多面經Java開發3面面經:準備好久沒想到面試題超級簡單
網易嚴選Java開發三面面經:HashMap+JVM+索引+消息隊列
一、服務器崩潰的思考
老板說,他要做個現場營銷活動,線上線下都要參與推廣,這個活動參與人數可能很大哦··· 果然,由于不是我寫的代碼,所以那天服務器就崩了,崩的時候很安靜,寫代碼的那個人一個人走的,走的時候很安詳。
當請求量到達百萬級時候,為啥會崩潰呢?
微服務中是通過接口去向服務提供者發起http請求或者rpc(tcp)請求去獲取數據,事實上大量請求中,服務端能處理的請求數量有限,服務中充斥著大量的線程,以及數據庫等的連接也會被占用完,導致請求響應速度也越來越慢。
- 響應速度和我們的數據層有關系嗎?
- 能不能去添加服務端服務器呢?
- 如果能減少客戶端向服務端的請求就好了?
- 限流嗎?當前場景能限流嗎?
- 每個線程去查詢數據,每次都只查詢某一個結果,是不是太浪費了?
- 我們能不能想辦法,提升我們系統的調用性能?
二、有人想看請求合并,今天她來了
上面的一些思路可以用加緩存,加MQ的方式去解決。但是緩存有限,MQ是個隊列,有限流的效果。那么,如何才能提高系統的調用能力,我們學習一下,請求合并,結果分發。
- 正常的請求都是一個請求一個線程,到后臺觸發相關的業務需求,去調用數據獲取業務。
- 當請求合并后,我們要將多個多個請求合并后統一去批量去調用。
大概的設計思路便是如下圖所示:
常規請求請求合并說下我們的思路- 解決請求調用多,比如調用商品數據,經過的服務多,調用鏈很長,所以查詢數據庫的次數也就非常多,數據庫連接池很快就被用光,導致很多請求被阻塞,也導致應用整體線程數非常高。雖然通過增加數據庫連接池大小可以緩解問題,并且可以通過壓力測試,但這治標不治本。
- 查詢商品信息的時候,如果同一商品同一時刻有100個請求,那么其中的99次查詢是多余的,可以把100個請求合并成一個真實的后臺接口調用,只要控制好線程安全即可。我的想法是使用并發計數器來實現再配合本地緩存,計數器可直接用JDK提供的AtomicInteger,線程安全又提供原子操作。
- 以獲取商品信息為例,每個商品id對應一個計數器,計數器初始值默認是0,當一個請求過來后通過incrementAndGet()使計數器自增1并返回自增后的值。當該值等于1,表明該線程在這個時間點上是第一個到達的線程,然后就去調用真實的業務邏輯,在查詢到結果后放入到本地緩存中。當該值大于1的時候,表明之前已有線程正在調用業務邏輯,則進入等待狀態,并循環的查詢本地緩存中是否已有數據可用。獲取到結果后都調用decrementAndGet()使計數器減1,計數器被減到0的時候就回到了初始狀態,并且當減到0(代表最后一個線程)時清除緩存。
- 那還有在1000次請求中,請求的數據id不同,但是使用的服務接口相同,都是查詢商品庫的商品id從1~1000的數據,都是從表里面查詢,queryDataById(dataId),那我也可以合并這些請求,改為批量查詢,然后將數據分發返回。思路就是設計每個請求攜帶一個請求唯一的traceId,有點像鏈路跟蹤的感覺,簡單點可以使用查詢的id進行最為跟蹤id,將請求放入一個隊中,使用定時任務,比如每隔10ms去掃描隊列,將這些業務合并請求統一去請求數據庫層。
- 此方案有個數據延遲的地方,就是每次循環時的等待狀態的時間。因為一次包含多次查庫的業務調用,耗時基本都在幾十毫秒,甚至是上百毫秒,可以把該等待睡眠設置小一點,比如10毫秒。這樣即不會浪費CPU時間,實時性也比較高,但然也可以通過主動喚醒等待線程的方式,這個操作起來就比較復雜些。在這其中還可以添加一些異常處理、超時控制、最大重試次數,最大并發數(超時最大并發數就快速失敗)等。
三、開始演練
import org.springframework.stereotype.Service;import java.util.*;/** * 模擬遠程調用ShopData接口 * @author Lijing */@Servicepublic class QueryServiceRemoteCall { /** * 調用遠程的商品信息查詢接口 * * @param code 商品編碼 * @return 返回商品信息,map格式 */ public HashMap queryShopDataInfoByCode(String code) { try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } HashMap hashMap = new HashMap<>(); hashMap.put("shopDataId", new Random().nextInt(999999999)); hashMap.put("code", code); hashMap.put("name", "小玩具"); hashMap.put("isOk", "true"); hashMap.put("price","3000"); return hashMap; } /** * 批量查詢 - 調用遠程的商品信息查詢接口 * * @param codes 多個商品編碼 * @return 返回多個商品信息 */ public List> queryShopDataInfoByCodeBatch(List codes) { List> result = new ArrayList<>(); for (String code : codes) { HashMap hashMap = new HashMap<>(); hashMap.put("shopDataId", new Random().nextInt(999999999)); hashMap.put("code", code); hashMap.put("name", "棉花糖"); hashMap.put("isOk", "true"); hashMap.put("price","6000"); result.add(hashMap); } return result; }}
- 使用CountDownLatch模擬并發請求的公共測試類
@RunWith(SpringRunner.class)@SpringBootTest(classes = MyBotApplication.class)public class MergerApplicationTests { long timed = 0L; @Before public void start() { System.out.println("開始測試"); timed = System.currentTimeMillis(); } @After public void end() { System.out.println("結束測試,執行時長:" + (System.currentTimeMillis() - timed)); } // 模擬的請求數量 private static final int THREAD_NUM = 1000; // 倒計數器 juc包中常用工具類 private CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM); @Autowired private ShopDataService shopDataService; @Test public void simulateCall() throws IOException { // 創建 并不是馬上發起請求 for (int i = 0; i < THREAD_NUM; i++) { final String code = "code-" + (i + 1); // 多線程模擬用戶查詢請求 Thread thread = new Thread(() -> { try { // 代碼在這里等待,等待countDownLatch為0,代表所有線程都start,再運行后續的代碼 countDownLatch.await(); // 模擬 http請求,實際上就是多線程調用這個方法 Map result = shopDataService.queryData(code); System.out.println(Thread.currentThread().getName() + " 查詢結束,結果是:" + result); } catch (Exception e) { System.out.println(Thread.currentThread().getName() + " 線程執行出現異常:" + e.getMessage()); } }); thread.setName("price-thread-" + code); thread.start(); // 啟動后,倒計時器倒計數 減一,代表又有一個線程就緒了 countDownLatch.countDown(); } System.in.read(); }}
/** * 商品數據服務類 * @author lijing */@Servicepublic class ShopDataService { @Autowired QueryServiceRemoteCall queryServiceRemoteCall; // 1000 用戶請求,1000個線程 public Map queryData(String shopDataId) throws ExecutionException, InterruptedException { return queryServiceRemoteCall.queryShopDataInfoByCode(shopDataId); }}
開始測試price-thread-code-3 查詢結束,結果是:{code=code-3, shopDataId=165800794, price=3000, isOk=true, name=小玩具}price-thread-code-994 查詢結束,結果是:{code=code-994, shopDataId=735455508, price=3000, isOk=true, name=小玩具}price-thread-code-36 查詢結束,結果是:{code=code-36, shopDataId=781610507, price=3000, isOk=true, name=小玩具}price-thread-code-993 查詢結束,結果是:{code=code-993, shopDataId=231087525, price=3000, isOk=true, name=小玩具}....... 省略代碼中。。。。price-thread-code-25 查詢結束,結果是:{code=code-25, shopDataId=149193873, price=3000, isOk=true, name=小玩具}price-thread-code-2 查詢結束,結果是:{code=code-2, shopDataId=324877405, price=3000, isOk=true, name=小玩具}.......共計1000次的查詢結果結束測試,執行時長:150
- 那么我們發現我們可以用code作為一個追蹤traceId,然后使用ScheduledExecutorService,CompletableFuture,LinkedBlockingQueue等一些多線程技術,就可以實現這個請求合并,請求分發的簡單實現demo.
import javax.annotation.PostConstruct;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.*;/** * 商品數據服務類 * * @author lijing */@Servicepublic class ShopDataService { class Request { String shopDataId; CompletableFuture> completableFuture; } // 集合,積攢請求,每N毫秒處理 LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); @PostConstruct public void init() { ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(5); scheduledExecutorPool.scheduleAtFixedRate(() -> { // TODO 取出所有queue的請求,生成一次批量查詢 int size = queue.size(); if (size == 0) { return; } System.out.println("此次合并了多少請求:" + size); // 1、 取出 ArrayList requests = new ArrayList<>(); ArrayList shopDataIds = new ArrayList<>(); for (int i = 0; i < size; i++) { Request request = queue.poll(); requests.add(request); shopDataIds.add(request.shopDataId); } // 2、 組裝一個批量查詢 (不會比單次查詢慢很多) List> mapList = queryServiceRemoteCall.queryShopDataInfoByCodeBatch(shopDataIds); // 3、 分發響應結果,給每一個request用戶請求 (多線程 之間的通信) HashMap> resultMap = new HashMap<>(); // 1000---- 007 for (Map map : mapList) { String code = map.get("code").toString(); resultMap.put(code, map); } // 1000個請求 for (Request req : requests) { Map result = resultMap.get(req.shopDataId); // 怎么通知對應的1000多個線程,取結果呢? req.completableFuture.complete(result); } }, 0, 10, TimeUnit.MILLISECONDS); } @Autowired QueryServiceRemoteCall queryServiceRemoteCall; /** * 1000 用戶請求,1000個線程 * * @param shopDataId * @return * @throws ExecutionException * @throws InterruptedException */ public Map queryData(String shopDataId) throws ExecutionException, InterruptedException { Request request = new Request(); request.shopDataId = shopDataId; CompletableFuture> future = new CompletableFuture<>(); request.completableFuture = future; queue.add(request); // 等待其他線程通知拿結果 return future.get(); }}
開始測試結束測試,執行時長:164此次合并了多少請求:63此次合并了多少請求:227此次合并了多少請求:32此次合并了多少請求:298此次合并了多少請求:68此次合并了多少請求:261此次合并了多少請求:51price-thread-code-747 查詢結束,結果是:{code=code-747, shopDataId=113980125, price=6000, isOk=true, name=棉花糖}price-thread-code-821 查詢結束,結果是:{code=code-821, shopDataId=568038265, price=6000, isOk=true, name=棉花糖}price-thread-code-745 查詢結束,結果是:{code=code-745, shopDataId=998247608, price=6000, isOk=true, name=棉花糖}....... 省略代碼中。。。。price-thread-code-809 查詢結束,結果是:{code=code-809, shopDataId=479029433, price=6000, isOk=true, name=棉花糖}price-thread-code-806 查詢結束,結果是:{code=code-806, shopDataId=929748878, price=6000, isOk=true, name=棉花糖}
可以看到我們將1000次請求進行了合并,數據也是正常的模擬到了。
四、總結
弊端:
- 啟用請求的成本是執行實際邏輯之前增加的延遲。
- 如果平均僅需要5毫秒的執行時間,放在一個10毫秒的做一次批處理的合并場景下,則在最壞的情況下,執行時間可能會變為15毫秒。(一定不適合低延遲的RPC場景、一定不適合低并發場景)
場景:
- 如果很少有超過1或2個請求會并發在一起,則沒有必要用。
- 一個特定的查詢同時被大量使用,并且可以將幾+個甚至數百個批處理在一起,那么如果能接受處理時間變長一點點,用來減少網絡連接欲,這是值得的。(典型如:數據庫、Http接口)
擴展:
- 我們不重復造輪子,在SpringCloud的組件spring-cloud-starter-netflix-hystrix中已經有封裝好的輪子Hystrix的HystrixCollapser來實現請求的合并,以減少通信消耗和線程數的占用。
- 當然他的組件比較復雜,也更全面,支持異步,同步,超時,異常等的處理機制。
- 但是,從底層思路來說,無非是線程之間的通信,線程的切換,隊列等一些并發編程相關的技術,只要我們高度封裝和抽象,那也可以手擼一個合并請求的框架處理。
總結
以上是生活随笔為你收集整理的dueros模拟测试没有请求后台_实战 | 用手写一个骚气的请求合并,演绎底层的真实...的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。