RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
文章目錄
- 廣播消息
- 廣播消息概述
- 演示步驟
- 延時消息
- 概述
- 使用場景
- 延時機制
- 實現原理
- 示例
- 批量消息
- 批量消息概述
- 示例
- 代碼
廣播消息
廣播消息概述
廣播消息就是向所有用戶發送消息。 如果我們希望所有訂閱者都能收到有關某個主題的消息,可以使用廣播消息。
舉個例子 生產者發送10條消息,有2個訂閱者,則這兩個訂閱者會分別收到10條消息, 而與廣播模式相對應的集群模式這是 2個訂閱者一共收到10條消息。
Rocketmq 消費者默認是集群的方式消費的,使用廣播模式進行消費需要顯示設置
核心:消費端設置消息模型 consumer.setMessageModel(MessageModel.BROADCASTING);
演示步驟
- 啟動2個或者2個以上的消費者
- 啟動生產者發送消息
- 觀察2個消費者的消息接收情況 :兩個Consumer收到了同樣的消息,OK.
生產者:
package com.artisan.rocketmq.broadcast;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-10 19:22* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 4; i++){Message msg = new Message("TopicTest","TagA","OrderID188",("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();} }消費者:
package com.artisan.rocketmq.broadcast;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 19:27* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//廣播,全量消費consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt ext : msgs){System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");} }測試結果:
生產者:
消費者1:
消費者2:
延時消息
概述
定時消息是指消息發到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能被消費。
使用場景
舉個例子: 電商系統,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
延時機制
org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel當前支持的延遲時間
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h分別對應級別
1 2 3....................設置消息時延
Message message = new Message; message.setDelayTimeLevel(3)現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18 消息消費失敗會進入延時消息隊列,消息發送時間與設置的延時等級和重試次數有關。
實現原理
延遲隊列的核心思路: 【利用中間隊列臨時存儲】—>所有的延遲消息由producer消息發憷之后,都會存放在一個topic下 (SHCEDULE_TOPIC_XXXX), 不同的延遲級別對應不同的隊列序號,當延遲時間到了之后,由定時線程讀取轉換為普通的消息存到真實指定的topic下,此時對于consumer端此消息才可見,從而被consumer消費。
示例
生產者:
package com.artisan.rocketmq.schedule;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;import java.util.Date;/*** @author 小工匠* @version v1.0* @create 2019-11-10 17:23* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();int totalMessagesToSend = 3;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());//延時消費 6-->2分鐘message.setDelayTimeLevel(6);// Send the messageproducer.send(message);}System.out.printf("message send is completed .%n" + new Date());producer.shutdown();} }消費者:
package com.artisan.rocketmq.schedule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;import java.util.Date; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 17:23* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.subscribe("TestTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println(new Date() + "Receive message[msgId=" + message.getMsgId() + "] "+ "message content is :" + new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//System.out.printf("Consumer Started.%n");} }設置的延遲level為6 ,對應的時間間隔是兩分鐘,OK。
批量消息
批量消息概述
批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息
此外,這一批消息的總大小不應超過4MB。rocketmq建議每次批量消息大小大概在1MB。當消息大小超過4MB時,需要將消息進行分割
示例
生產者
package com.artisan.rocketmq.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;import java.util.ArrayList; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 21:27* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class BatchProducer {public static void main(String[] args) throws Exception {/*** rocketMq 支持消息批量發送* 同一批次的消息應具有:相同的主題,相同的waitStoreMsgOK,并且不支持定時任務。* <strong> 同一批次消息建議大小不超過~1M </strong>,消息最大不能超過4M,需要* 對msg進行拆分*/DefaultMQProducer producer = new DefaultMQProducer("batch_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));ListSplitter splitter = new ListSplitter(messages);/*** 對批量消息進行拆分*/while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();}}producer.shutdown();}}消息拆分
package com.artisan.rocketmq.batch;import org.apache.rocketmq.common.message.Message;import java.util.Iterator; import java.util.List; import java.util.Map;/*** @author 小工匠* @version v1.0* @create 2019-11-10 21:35* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MBprivate final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;//遍歷消息準備拆分for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {if (nextIndex - currIndex == 0) {nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;} }消費者
package com.artisan.rocketmq.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 21:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.subscribe("BatchTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");} }代碼
請移步: https://github.com/yangshangwei/rocketmqMaster
總結
以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ-初体验RocketMQ
- 下一篇: RocketMQ-初体验RocketMQ