JMS学习七(ActiveMQ之Topic的持久订阅)
非持久化訂閱持續(xù)到它們訂閱對(duì)象的生命周期。這意味著,客戶端只能在訂閱者活動(dòng)時(shí)看到相關(guān)主題發(fā)布的消息。如果訂閱者不活動(dòng),它會(huì)錯(cuò)過(guò)相關(guān)主題的消息。如果花費(fèi)較大的開銷,訂閱者可以被定義為durable(持久化的)。持久化的訂閱者注冊(cè)一個(gè)帶有JMS保持的唯一標(biāo)識(shí)的持久化訂閱(subscription)。帶有相同標(biāo)識(shí)的后續(xù)訂閱者會(huì)再續(xù)前一個(gè)訂閱者的訂閱狀態(tài)。如果持久化訂閱沒(méi)有活動(dòng)的訂閱者,JMS會(huì)保持訂閱消息,直到消息被訂閱接收或者過(guò)期。
生產(chǎn)者:
package cn.slimsmart.activemq.demo.topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws JMSException { // 連接到ActiveMQ服務(wù)器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.43:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建主題 Topic topic = session.createTopic("slimsmart.topic.test"); MessageProducer producer = session.createProducer(topic); // NON_PERSISTENT 非持久化 PERSISTENT 持久化,發(fā)送消息時(shí)用使用持久模式 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage(); message.setText("topic 消息。"); message.setStringProperty("property", "消息Property"); // 發(fā)布主題消息 producer.send(message); System.out.println("Sent message: " + message.getText()); session.commit(); session.close(); connection.close(); } }?
消費(fèi)者:
package cn.slimsmart.activemq.demo.topic; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 持久訂閱設(shè)置唯一的客戶端ID和訂閱者ID。 */ public class ConsumerPersistent { public static void main(String[] args) throws JMSException { String clientId = "client_id"; // 連接到ActiveMQ服務(wù)器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.43:61616"); Connection connection = factory.createConnection(); //客戶端ID,持久訂閱需要設(shè)置 connection.setClientID(clientId); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建主題 Topic topic = session.createTopic("slimsmart.topic.test"); // 創(chuàng)建持久訂閱,指定客戶端ID。 MessageConsumer consumer = session.createDurableSubscriber(topic,clientId); consumer.setMessageListener(new MessageListener() { // 訂閱接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } }); } }?
注:
?
1.activemq區(qū)分消費(fèi)者,是通過(guò)clientID和訂閱客戶名稱來(lái)區(qū)分的。
2.消息的生產(chǎn)者,發(fā)送消息時(shí)用使用持久模式。
3.使用相同的“clientID”,則認(rèn)為是同一個(gè)消費(fèi)者。兩個(gè)程序使用相同的“clientID”,則同時(shí)只能有一個(gè)連接到activemq,第二個(gè)連接的會(huì)報(bào)錯(cuò)。
4.activemq的設(shè)置在conf/activemq.xml中,默認(rèn)消息是保存在data/kahadb中,重啟activemq消息不會(huì)丟。
轉(zhuǎn)載于:https://www.cnblogs.com/alter888/p/8976221.html
總結(jié)
以上是生活随笔為你收集整理的JMS学习七(ActiveMQ之Topic的持久订阅)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Dungeon Master(三维bfs
- 下一篇: Elasticsearch DSL