Apollo 8 — ConfigService 异步轮询接口的实现
源碼
Apollo 長輪詢的實(shí)現(xiàn),是通過客戶端輪詢 /notifications/v2 接口實(shí)現(xiàn)的。具體代碼在 com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2.java。
這個(gè)類也是實(shí)現(xiàn)了 ReleaseMessageListener 監(jiān)控,表明他是一個(gè)消息監(jiān)聽器,當(dāng)有新的消息時(shí),就會(huì)調(diào)用他的 hanlderMessage 方法。這個(gè)具體我們后面再說。
該類只有一個(gè) rest 接口: pollNotification 方法。返回值是 DeferredResult,這是 Spring 支持 Servlet 3 的一個(gè)類,關(guān)于異步同步的不同,可以看筆者的另一篇文章 異步 Servlet 和同步 Servlet 的性能測(cè)試。
該接口提供了幾個(gè)參數(shù):
大家有么有覺得少了什么? namespace 。
當(dāng)然,沒有 namespace 這個(gè)重要的參數(shù)是不存在的。
參數(shù)在 notificationsAsString 中。客戶端會(huì)將自己所有的 namespace 傳遞到服務(wù)端進(jìn)行查詢。
是時(shí)候上源碼了。
@RequestMapping(method = RequestMethod.GET)public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(@RequestParam(value = "appId") String appId,// appId@RequestParam(value = "cluster") String cluster,// default@RequestParam(value = "notifications") String notificationsAsString,// json 對(duì)象 List<ApolloConfigNotification>@RequestParam(value = "dataCenter", required = false) String dataCenter,// 基本用不上, idc 屬性@RequestParam(value = "ip", required = false) String clientIp) {List<ApolloConfigNotification> notifications =// 轉(zhuǎn)換成對(duì)象gson.fromJson(notificationsAsString, notificationsTypeReference);// Spring 的異步對(duì)象: timeout 60s, 返回304DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();Set<String> namespaces = Sets.newHashSet();Map<String, Long> clientSideNotifications = Maps.newHashMap();Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);// 過濾一下名字// 循環(huán)for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {// 拿出 keyString normalizedNamespace = notificationEntry.getKey();// 拿出 valueApolloConfigNotification notification = notificationEntry.getValue();/* 添加到 namespaces Set */namespaces.add(normalizedNamespace);// 添加到 client 端的通知, key 是 namespace, values 是 messageIdclientSideNotifications.put(normalizedNamespace, notification.getNotificationId());// 如果不相等, 記錄客戶端名字if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {// 記錄 key = 標(biāo)準(zhǔn)名字, value = 客戶端名字deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);}}// 記在 namespaces 集合, clientSideNotifications 也put (namespace, notificationId)// 組裝得到需要觀察的 key,包括公共的.Multimap<String, String> watchedKeysMap =watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);// namespaces 是集合// 得到 value; 這個(gè) value 也就是 appId + cluster + namespaceSet<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());// 從緩存得到最新的發(fā)布消息List<ReleaseMessage> latestReleaseMessages =// 根據(jù) key 從緩存得到最新發(fā)布的消息.releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);/* 如果不關(guān)閉, 這個(gè)請(qǐng)求將會(huì)一直持有一個(gè)數(shù)據(jù)庫連接. 影響并發(fā)能力. 這是一個(gè) hack 操作*/entityManagerUtil.closeEntityManager();// 計(jì)算出新的通知List<ApolloConfigNotification> newNotifications =getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,latestReleaseMessages);// 不是空, 理解返回結(jié)果, 不等待if (!CollectionUtils.isEmpty(newNotifications)) {deferredResultWrapper.setResult(newNotifications);} else {// 設(shè)置 timeout 回調(diào):打印日志deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));// 設(shè)置完成回調(diào):刪除 keydeferredResultWrapper.onCompletion(() -> {//取消注冊(cè)for (String key : watchedKeys) {deferredResults.remove(key, deferredResultWrapper);}});//register all keys 注冊(cè)for (String key : watchedKeys) {this.deferredResults.put(key, deferredResultWrapper);}}// 立即返回return deferredResultWrapper.getResult();/** @see DeferredResultHandler 是關(guān)鍵 */}注釋寫了很多了,再簡單說說邏輯:
Apollo 的 DeferredResultWrapper 保證了 Spring 的 DeferredResult 對(duì)象,泛型內(nèi)容是 List, 構(gòu)造這個(gè)對(duì)象,默認(rèn)的 timeout 是 60 秒,即掛起 60 秒。同時(shí),對(duì) setResult 方法進(jìn)行包裝,加入了對(duì)客戶端 key 和服務(wù)端 key 的一個(gè)映射(大小寫不一致) 。
我們剛剛說,Apollo 會(huì)將這些 key 注冊(cè)起來。那么什么時(shí)候使用呢,異步對(duì)象被掛起,又是上面時(shí)候被喚醒呢?
答案就在 handleMessage 方法里。我們剛剛說他是一個(gè)監(jiān)聽器,當(dāng)消息掃描器掃描到新的消息時(shí),會(huì)通知所有的監(jiān)聽器,也就是執(zhí)行 handlerMessage 方法。方法內(nèi)容如下:
@Override public void handleMessage(ReleaseMessage message, String channel) {String content = message.getMessage();if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {return;}String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);//create a new list to avoid ConcurrentModificationException 構(gòu)造一個(gè)新 list ,防止并發(fā)失敗List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));// 創(chuàng)建通知對(duì)象ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());configNotification.addMessage(content, message.getId());//do async notification if too many clients 如果有大量的客戶端(100)在等待,使用線程池異步處理if (results.size() > bizConfig.releaseMessageNotificationBatch()) {// 大量通知批量處理largeNotificationBatchExecutorService.submit(() -> {for (int i = 0; i < results.size(); i++) { // 循環(huán)/** 假設(shè)一個(gè)公共 Namespace 有10W 臺(tái)機(jī)器使用,如果該公共 Namespace 發(fā)布時(shí)直接下發(fā)配置更新消息的話,* 就會(huì)導(dǎo)致這 10W 臺(tái)機(jī)器一下子都來請(qǐng)求配置,這動(dòng)靜就有點(diǎn)大了,而且對(duì) Config Service 的壓力也會(huì)比較大。* 即"驚群效應(yīng)"*/if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {// 如果處理了一批客戶端,休息一下(100ms)TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());}results.get(i).setResult(configNotification);// 通知每個(gè)等待的 HTTP 請(qǐng)求}});return;}// 否則,同步處理for (DeferredResultWrapper result : results) {result.setResult(configNotification);} }筆者去除了一些日志和一些數(shù)據(jù)判斷。大致的邏輯如下:
具體的流程圖如下:
其中,灰色區(qū)域是掃描器的異步線程,黃色區(qū)域是接口的同步線程。他們共享 deferredResults 這個(gè)線程安全的 Map,實(shí)現(xiàn)異步解耦和實(shí)時(shí)通知客戶端。
總結(jié)
好了,這就是 Apollo 的長輪詢接口,客戶端會(huì)不斷的輪詢服務(wù)器,服務(wù)器會(huì) Hold住 60 秒,這是通過 Servlet 3 的異步 + NIO 來實(shí)現(xiàn)的,能夠保持萬級(jí)連接(Tomcat 默認(rèn) 10000)。
通過一個(gè)線程安全的 Map + 監(jiān)聽器,讓掃描器線程和 HTTP 線程共享 Spring 異步對(duì)象,即實(shí)現(xiàn)了消息實(shí)時(shí)通知,也讓應(yīng)用程序?qū)崿F(xiàn)異步解耦。
轉(zhuǎn)載于:https://www.cnblogs.com/stateis0/p/9393871.html
總結(jié)
以上是生活随笔為你收集整理的Apollo 8 — ConfigService 异步轮询接口的实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: XtraReport交叉表自适应行高及最
- 下一篇: vue自定义指令截取图片中心显示