rocketmq 消费方式_RocketMQ事务消费和顺序消费详解
一、RocketMq有3中消息類型
1.普通消費(fèi)
2. 順序消費(fèi)
3.事務(wù)消費(fèi)
順序消費(fèi)場景
在網(wǎng)購的時(shí)候,我們需要下單,那么下單需要假如有三個(gè)順序,第一、創(chuàng)建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個(gè)三個(gè)環(huán)節(jié)要有順序,這個(gè)訂單才有意義。RocketMQ可以保證順序消費(fèi)。
rocketMq實(shí)現(xiàn)順序消費(fèi)的原理
produce在發(fā)送消息的時(shí)候,把消息發(fā)到同一個(gè)隊(duì)列(queue)中,消費(fèi)者注冊消息監(jiān)聽器為MessageListenerOrderly,這樣就可以保證消費(fèi)端只有一個(gè)線程去消費(fèi)消息
注意:是把把消息發(fā)到同一個(gè)隊(duì)列(queue),不是同一個(gè)topic,默認(rèn)情況下一個(gè)topic包括4個(gè)queue
單個(gè)節(jié)點(diǎn)(Producer端1個(gè)、Consumer端1個(gè))
1、Producer.java
packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,發(fā)送順序消息*/
public classProducer {public static voidmain(String[] args) {try{
DefaultMQProducer producer= new DefaultMQProducer("order_Producer");
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };
for (int i = 1; i <= 5; i++) {
Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());
SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {
Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);
}
},0);
System.out.println(sendResult);
}
producer.shutdown();
}catch(MQClientException e) {
e.printStackTrace();
}catch(RemotingException e) {
e.printStackTrace();
}catch(MQBrokerException e) {
e.printStackTrace();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
2、Consumer.java
packageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.common.consumer.ConsumeFromWhere;importcom.alibaba.rocketmq.common.message.MessageExt;/*** 順序消息消費(fèi),帶事務(wù)方式(應(yīng)用可控制Offset什么時(shí)候提交)*/
public classConsumer1 {public static void main(String[] args) throwsMQClientException {
DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");
consumer.registerMessageListener(newMessageListenerOrderly() {
AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//設(shè)置自動提交
context.setAutoCommit(true);for(MessageExt msg : msgs) {
System.out.println(msg+ ",內(nèi)容:" + newString(msg.getBody()));
}try{
TimeUnit.SECONDS.sleep(5L);
}catch(InterruptedException e) {
e.printStackTrace();
}
;returnConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
結(jié)果如下圖所示:
這個(gè)五條數(shù)據(jù)被順序消費(fèi)了
多個(gè)節(jié)點(diǎn)(Producer端1個(gè)、Consumer端2個(gè))
Producer.java
packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,發(fā)送順序消息*/
public classProducer {public static voidmain(String[] args) {try{
DefaultMQProducer producer= new DefaultMQProducer("order_Producer");
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };
for (int i = 1; i <= 5; i++) {
Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());
SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {
Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);
}
},0);
System.out.println(sendResult);
}for (int i = 1; i <= 5; i++) {
Message msg= new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " +i).getBytes());
SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {
Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);
}
},1);
System.out.println(sendResult);
}for (int i = 1; i <= 5; i++) {
Message msg= new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " +i).getBytes());
SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {
Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);
}
},2);
System.out.println(sendResult);
}
producer.shutdown();
}catch(MQClientException e) {
e.printStackTrace();
}catch(RemotingException e) {
e.printStackTrace();
}catch(MQBrokerException e) {
e.printStackTrace();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer1.java
/*** 順序消息消費(fèi),帶事務(wù)方式(應(yīng)用可控制Offset什么時(shí)候提交)*/
public classConsumer1 {public static void main(String[] args) throwsMQClientException {
DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");/*** 實(shí)現(xiàn)了MessageListenerOrderly表示一個(gè)隊(duì)列只會被一個(gè)線程取到
*,第二個(gè)線程無法訪問這個(gè)隊(duì)列*/consumer.registerMessageListener(newMessageListenerOrderly() {
AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//設(shè)置自動提交
context.setAutoCommit(true);for(MessageExt msg : msgs) {
System.out.println(msg+ ",內(nèi)容:" + newString(msg.getBody()));
}try{
TimeUnit.SECONDS.sleep(5L);
}catch(InterruptedException e) {
e.printStackTrace();
}
;returnConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
Consumer2.java
/*** 順序消息消費(fèi),帶事務(wù)方式(應(yīng)用可控制Offset什么時(shí)候提交)*/
public classConsumer2 {public static void main(String[] args) throwsMQClientException {
DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");/*** 實(shí)現(xiàn)了MessageListenerOrderly表示一個(gè)隊(duì)列只會被一個(gè)線程取到
*,第二個(gè)線程無法訪問這個(gè)隊(duì)列*/consumer.registerMessageListener(newMessageListenerOrderly() {
AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//設(shè)置自動提交
context.setAutoCommit(true);for(MessageExt msg : msgs) {
System.out.println(msg+ ",內(nèi)容:" + newString(msg.getBody()));
}try{
TimeUnit.SECONDS.sleep(5L);
}catch(InterruptedException e) {
e.printStackTrace();
}
;returnConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer2 Started.");
}
}
先啟動Consumer1和Consumer2,然后啟動Producer,Producer會發(fā)送15條消息
Consumer1消費(fèi)情況如圖,都按照順序執(zhí)行了
Consumer2消費(fèi)情況如圖,都按照順序執(zhí)行了
二、事務(wù)消費(fèi)
這里說的主要是分布式事物。下面的例子的數(shù)據(jù)庫分別安裝在不同的節(jié)點(diǎn)上。
事物消費(fèi)需要先說說什么是事務(wù)。比如說:我們跨行轉(zhuǎn)賬,從工商銀行轉(zhuǎn)到建設(shè)銀行,也就是我從工商銀行扣除1000元之后,我的建設(shè)銀行也必須加1000元。這樣才能保證數(shù)據(jù)的一致性。假如工商銀行轉(zhuǎn)1000元之后,建設(shè)銀行的服務(wù)器突然宕機(jī),那么我扣除了1000,但是并沒有在建設(shè)銀行給我加1000,就出現(xiàn)了數(shù)據(jù)的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。
再比如,我們進(jìn)行網(wǎng)購的時(shí)候,我們下單之后,訂單提交成功,倉庫商品的數(shù)量必須減一。但是訂單可能是一個(gè)數(shù)據(jù)庫,倉庫數(shù)量可能又是在另個(gè)數(shù)據(jù)庫里面。有可能訂單提交成功之后,倉庫數(shù)量服務(wù)器突然宕機(jī)。這樣也出現(xiàn)了數(shù)據(jù)不一致的問題。
使用消息隊(duì)列來解決分布式事物:
現(xiàn)在我們?nèi)ネ饷骘埖瓿燥?#xff0c;很多時(shí)候都不會直接給了錢之后直接在付款的窗口遞飯菜,而是付款之后他會給你一張小票,你拿著這個(gè)小票去出飯的窗口取飯。這里和我們的系統(tǒng)類似,提高了吞吐量。即使你到第二個(gè)窗口,師傅告訴你已經(jīng)沒飯了,你可以拿著這個(gè)憑證去退款,即使中途由于出了意外你無法到達(dá)窗口進(jìn)行取飯,但是只要憑證還在,可以將錢退給你。這樣就保證了數(shù)據(jù)的一致性。
如何保證憑證(消息)有2種方法:
1、在工商銀行扣款的時(shí)候,余額表扣除1000,同時(shí)記錄日志,而且這2個(gè)表是在同一個(gè)數(shù)據(jù)庫實(shí)例中,可以使用本地事物解決。然后我們通知建設(shè)銀行需要加1000給該用戶,建設(shè)銀行收到之后給我返回已經(jīng)加了1000給用戶的確認(rèn)信息之后,我再標(biāo)記日志表里面的日志為已經(jīng)完成。
2、通過消息中間件
總結(jié)
以上是生活随笔為你收集整理的rocketmq 消费方式_RocketMQ事务消费和顺序消费详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 兰亭字体linux版本,两种方法,让最新
- 下一篇: 卧槽,入职 3 天就“偷”代码,备份 6