从RocketMQ看长轮询(Long Polling)
前言
消息隊列一般在消費端都會提供push和pull兩種模式,RocketMQ同樣實現(xiàn)了這兩種模式,分別提供了兩個實現(xiàn)類:DefaultMQPushConsumer和DefaultMQPullConsumer;兩種方式各有優(yōu)勢:
push模式:推送模式,即服務(wù)端有數(shù)據(jù)之后立馬推送消息給客戶端,需要客戶端和服務(wù)器建立長連接,實時性很高,對客戶端來說也簡單,接收處理消息即可;缺點就是服務(wù)端不知道客戶端處理消息的能力,可能會導(dǎo)致數(shù)據(jù)積壓,同時也增加了服務(wù)端的工作量,影響服務(wù)端的性能;
pull模式:拉取模式,即客戶端主動去服務(wù)端拉取數(shù)據(jù),主動權(quán)在客戶端,拉取數(shù)據(jù),然后處理數(shù)據(jù),再拉取數(shù)據(jù),一直循環(huán)下去,具體拉取數(shù)據(jù)的時間間隔不好設(shè)定,太短可能會導(dǎo)致大量的連接拉取不到數(shù)據(jù),太長導(dǎo)致數(shù)據(jù)接收不及時;
RocketMQ使用了長輪詢的方式,兼顧了push和pull兩種模式的優(yōu)點,下面首先對長輪詢做簡單介紹,進(jìn)而分析RocketMQ內(nèi)置的長輪詢模式。
長輪詢
長輪詢通過客戶端和服務(wù)端的配合,達(dá)到主動權(quán)在客戶端,同時也能保證數(shù)據(jù)的實時性;長輪詢本質(zhì)上也是輪詢,只不過對普通的輪詢做了優(yōu)化處理,服務(wù)端在沒有數(shù)據(jù)的時候并不是馬上返回數(shù)據(jù),會hold住請求,等待服務(wù)端有數(shù)據(jù),或者一直沒有數(shù)據(jù)超時處理,然后一直循環(huán)下去;下面看一下如何簡單實現(xiàn)一個長輪詢;
1.實現(xiàn)步驟
1.1客戶端輪詢發(fā)送請求
客戶端應(yīng)該存在一個一直循環(huán)的程序,不停的向服務(wù)端發(fā)送獲取消息請求;
1.2服務(wù)端處理數(shù)據(jù)
服務(wù)器接收到客戶端請求之后,首先查看是否有數(shù)據(jù),如果有數(shù)據(jù)則直接返回,如果沒有則保持連接,等待獲取數(shù)據(jù),服務(wù)端獲取數(shù)據(jù)之后,會通知之前的請求連接來獲取數(shù)據(jù),然后返回給客戶端;
1.3客戶端接收數(shù)據(jù)
正常情況下,客戶端會馬上接收到服務(wù)端的數(shù)據(jù),或者等待一段時間獲取到數(shù)據(jù);如果一直獲取不到數(shù)據(jù),會有超時處理;在獲取數(shù)據(jù)或者超時處理之后會關(guān)閉連接,然后再次發(fā)起長輪詢請求;
2.實現(xiàn)實例
以下使用netty模擬一個http服務(wù)器,使用HttpURLConnection模擬客戶端發(fā)送請求,使用BlockingQueue存放數(shù)據(jù);
服務(wù)端代碼
public class Server {public static void start(final int port) throws Exception {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup woker = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();try {serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker).childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("http-decoder", new HttpServerCodec());ch.pipeline().addLast(new HttpServerHandler());}});ChannelFuture future = serverBootstrap.bind(port).sync();System.out.println("server start ok port is " + port);DataCenter.start();future.channel().closeFuture().sync();} finally {boss.shutdownGracefully();woker.shutdownGracefully();}}public static void main(String[] args) throws Exception {start(8080);} }netty默認(rèn)支持http協(xié)議,直接使用即可,啟動端口為8080;同時啟動數(shù)據(jù)中心服務(wù),相關(guān)代碼如下:
public class DataCenter {private static Random random = new Random();private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();private static AtomicInteger num = new AtomicInteger();public static void start() {while (true) {try {Thread.sleep(random.nextInt(5) * 1000);String data = "hello world" + num.incrementAndGet();queue.put(data);System.out.println("store data:" + data);} catch (InterruptedException e) {e.printStackTrace();}}}public static String getData() throws InterruptedException {return queue.take();}}為了模擬服務(wù)端沒有數(shù)據(jù),需要等待的情況,這里使用BlockingQueue來模擬,不定期的往隊列里面插入數(shù)據(jù),同時對外提供獲取數(shù)據(jù)的方法,使用的是take方法,沒有數(shù)據(jù)會阻塞知道有數(shù)據(jù)為止;getData在類HttpServerHandler中使用,此類也很簡單,如下:
public class HttpServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpRequest) {FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);httpResponse.content().writeBytes(DataCenter.getData().getBytes());httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes());ctx.writeAndFlush(httpResponse);}} }獲取到客戶端的請求之后,從數(shù)據(jù)中心獲取一條消息,如果沒有數(shù)據(jù),會進(jìn)行等待,直到有數(shù)據(jù)為止;然后使用FullHttpResponse返回給客戶端;客戶端使用HttpURLConnection來和服務(wù)端建立連接,不停的拉取數(shù)據(jù),代碼如下:
public class Client {public static void main(String[] args) {while (true) {HttpURLConnection connection = null;try {URL url = new URL("http://localhost:8080");connection = (HttpURLConnection) url.openConnection();connection.setReadTimeout(10000);connection.setConnectTimeout(3000);connection.setRequestMethod("GET");connection.connect();if (200 == connection.getResponseCode()) {BufferedReader reader = null;try {reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));StringBuffer result = new StringBuffer();String line = null;while ((line = reader.readLine()) != null) {result.append(line);}System.out.println("時間:" + new Date().toString() + "result = " + result);} finally {if (reader != null) {reader.close();}}}} catch (IOException e) {e.printStackTrace();} finally {if (connection != null) {connection.disconnect();}}}} }以上只是簡單的模擬了長輪詢的方式,下面重點來看看RocketMQ是如何實現(xiàn)長輪詢的;
RocketMQ長輪詢
RocketMQ的消費端提供了兩種消費模式分別是:DefaultMQPushConsumer和DefaultMQPullConsumer,其中DefaultMQPushConsumer就是使用的長輪詢,所以下面重點分析此類;
1.PullMessage服務(wù)
從名字可以看出來就是客戶端從服務(wù)端拉取數(shù)據(jù)的服務(wù),看里面的一個核心方法:
@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}服務(wù)啟動之后,會一直不停的循環(huán)調(diào)用拉取數(shù)據(jù),PullRequest可以看作是拉取數(shù)據(jù)需要的參數(shù),部分代碼如下:
public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue;private long nextOffset;private boolean lockedFirst = false;...省略... }每個MessageQueue 對應(yīng)了封裝成了一個PullRequest,因為拉取數(shù)據(jù)是以每個Broker下面的Queue為單位,同時里面還一個ProcessQueue,每個MessageQueue也同樣對應(yīng)一個ProcessQueue,保存了這個MessageQueue消息處理狀態(tài)的快照;還有nextOffset用來標(biāo)識讀取的位置;繼續(xù)看一段pullMessage中的內(nèi)容,給服務(wù)端發(fā)送請求的頭內(nèi)容:
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); }PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;其中有一個參數(shù)是SuspendTimeoutMillis,作用是設(shè)置Broker的最長阻塞時間,默認(rèn)為15秒,前提是沒有消息的情況下,有消息會立刻返回;
2.PullMessageProcessor服務(wù)
從名字可以看出,服務(wù)端用來處理pullMessage的服務(wù),下面重點看一下processRequest方法,其中包括對獲取不同結(jié)果做的處理:
switch (response.getCode()) {case ResponseCode.SUCCESS:...省略...break;case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:...省略...break;default:assert false;一共處理了四個類型,我們關(guān)心的是在沒有獲取到數(shù)據(jù)的情況下是如何處理的,可以重點看一下ResponseCode.PULL_NOT_FOUND,表示沒有拉取到數(shù)據(jù),此時會調(diào)用PullRequestHoldService服務(wù),從名字可以看出此服務(wù)用來hold住請求,不會立馬返回,response被至為了null,不給客戶端響應(yīng);下面重點看一下PullRequestHoldService:
@Overridepublic void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}此方法主要就是通過不停的檢查被hold住的請求,檢查是否已經(jīng)有數(shù)據(jù)了,具體檢查哪些就是在ResponseCode.PULL_NOT_FOUND中調(diào)用的suspendPullRequest方法:
private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}將需要hold處理的PullRequest放入到一個ConcurrentHashMap中,等待被檢查;具體的檢查代碼在checkHoldRequest中:
private void checkHoldRequest() {for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);try {this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}此方法用來獲取指定messageQueue下最大的offset,然后用來和當(dāng)前的offset來比較,來確定是否有新的消息到來;往下看notifyMessageArriving方法:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);}if (newestOffset > request.getPullFromThisOffset()) {if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}方法中兩個重要的判定就是:比較當(dāng)前的offset和maxoffset,看是否有新的消息到來,有新的消息返回客戶端;另外一個就是比較當(dāng)前的時間和阻塞的時間,看是否超過了最大的阻塞時間,超過也同樣返回;
此方法不光在PullRequestHoldService服務(wù)類中循環(huán)調(diào)用檢查,同時在DefaultMessageStore中消息被存儲的時候調(diào)用;其實就是主動檢查和被動通知兩種方式。
3.PullCallback回調(diào)
服務(wù)端處理完之后,給客戶端響應(yīng),回調(diào)其中的PullCallback,其中在處理完消息之后,重要的一步就是再次把pullRequest放到PullMessageService服務(wù)中,等待下一次的輪詢;
總結(jié)
本文首先介紹了兩種消費消息的模式,介紹了其中的優(yōu)缺點,然后引出了長輪詢,并且在本地簡單模擬了長輪詢,最后重點介紹了RocketMQ中是如何實現(xiàn)的長輪詢。
示例代碼地址
Github
Gitee
總結(jié)
以上是生活随笔為你收集整理的从RocketMQ看长轮询(Long Polling)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iptables的增删改查
- 下一篇: (十六)spring cloud微服务分