消息确认机制---confirm异步
生活随笔
收集整理的這篇文章主要介紹了
消息确认机制---confirm异步
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一:介紹
1.異步模式介紹
Channel對象提供ConfirmListener()回調方法只包含deliverTag(當前Channel發出的序列號),我們需要自己為每一個Channel維護一個unconfirm的消息序列集合,沒publish一條數據,集合就加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或者多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好采用有序集合SortedSet存儲結構。
?
二:程序
1.生產者
1 package com.mq.AsynConfirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.ConfirmListener; 6 import com.rabbitmq.client.Connection; 7 8 import java.io.IOException; 9 import java.util.Collections; 10 import java.util.SortedSet; 11 import java.util.TreeSet; 12 13 public class Send { 14 private static final String QUEUE_NAME="test_queue_confirm_asyn"; 15 public static void main(String[] args)throws Exception{ 16 Connection connection= ConnectionUtil.getConnection(); 17 Channel channel=connection.createChannel(); 18 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 19 //生產者調用confirmSelect將channel設置為nconfirm模式 20 channel.confirmSelect(); 21 final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>()); 22 channel.addConfirmListener(new ConfirmListener() { 23 //沒有問題 24 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 25 if (multiple){ 26 System.out.println("handleAck multiple"); 27 confirmSet.headSet(deliveryTag+1).clear(); 28 }else{ 29 System.out.println("handleAck false"); 30 confirmSet.remove(deliveryTag); 31 } 32 } 33 //有問題 34 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 35 if (multiple){ 36 System.out.println("handleNack multiple"); 37 confirmSet.headSet(deliveryTag+1).clear(); 38 }else{ 39 System.out.println("handleNack false"); 40 confirmSet.remove(deliveryTag); 41 } 42 } 43 }); 44 String msg="success"; 45 while (true){ 46 long seqNo=channel.getNextPublishSeqNo(); 47 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 48 confirmSet.add(seqNo); 49 } 50 51 } 52 }?
2.消費者
1 package com.mq.AsynConfirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Receive { 9 private static final String QUEUE_NAME="test_queue_confirm_asyn"; 10 public static void main(String[] args)throws Exception { 11 Connection connection = ConnectionUtil.getConnection(); 12 Channel channel = connection.createChannel(); 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 17 System.out.println(new String(body,"utf-8")); 18 } 19 }); 20 } 21 }?
3.現象
Send:
?
轉載于:https://www.cnblogs.com/juncaoit/p/8635633.html
總結
以上是生活随笔為你收集整理的消息确认机制---confirm异步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Swoole练习 Web
- 下一篇: PTA实验作业-01