rocketmq发送顺序消息(四)
生活随笔
收集整理的這篇文章主要介紹了
rocketmq发送顺序消息(四)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
rocketmq怎么發(fā)送消息可參考我的上一篇博客:rocketmq發(fā)送第一條消息。此處我們講解如何發(fā)送rocketmq順序消息
producer
public class ProducerOrder {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("testGrp");// 設(shè)置nameserver地址 nameserver具備路由功能(發(fā)現(xiàn)服務(wù),有點(diǎn)注冊中心的意思),讓其分配合理的broker來進(jìn)行消息發(fā)送producer.setNamesrvAddr("192.168.52.11:9876");try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 20; i++) {Message message = new Message("monkeyOrderMsgTopic", ("這是順序消息:" + i).getBytes());producer.send(message,// 自定義選擇Queuenew MessageQueueSelector() {/**** @param list 當(dāng)前topic里所有的queue* @param message 要發(fā)送的消息* @param o 對(duì)應(yīng)到 send() 里的 args參數(shù)* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {// 根據(jù)傳入的參數(shù)決定QueueMessageQueue messageQueue = list.get((Integer)o);return messageQueue;}}, 0, 3000);}System.out.println("發(fā)送完成");} }?consumer
public class ConsumerOrder {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumerOrder");consumer.setNamesrvAddr("192.168.52.11:9876");consumer.subscribe("monkeyOrderMsgTopic","*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt messageExt : list) {System.out.println(new String(messageExt.getBody()) + "current Thread:" + Thread.currentThread().getName());}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("consumer start .....");}}測試結(jié)果:?
?總結(jié):
你們應(yīng)該如何保證消息的順序?
-
同一topic
-
同一個(gè)QUEUE
-
發(fā)消息的時(shí)候一個(gè)線程去發(fā)送消息
-
消費(fèi)的時(shí)候 一個(gè)線程 消費(fèi)一個(gè)queue里的消息或者使用MessageListenerOrderly
-
多個(gè)queue 只能保證單個(gè)queue里的順序
總結(jié)
以上是生活随笔為你收集整理的rocketmq发送顺序消息(四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rocketmq发送第一条消息(三)
- 下一篇: Linux安装最新Redis