JMS学习四(ActiveMQ消息过滤)
一、消息的選擇器
不管是在消息發(fā)送端設(shè)置消息過期時(shí)間還是在接收端設(shè)置等待時(shí)間,都是對不滿足的消息有過濾的作用,那消息選擇器就是為過濾消息而生的下面來看看消息選擇器:
ActiveMQ提供了一種機(jī)制,使用它,消息服務(wù)可根據(jù)消息選擇器中的標(biāo)準(zhǔn)來執(zhí)行消息過濾。生產(chǎn)者可在消息中放入應(yīng)用程序特有的屬性,而消費(fèi)者可使用基于這些屬性的選擇標(biāo)準(zhǔn)來表明對消息是否感興趣。這就簡化了客戶端的工作,并避免了向不需要這些消息的消費(fèi)者傳送消息的開銷。然而,它也使得處理選擇標(biāo)準(zhǔn)的消息服務(wù)增加了一些額外開銷。 消息選擇器是用于MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),并確定是否將實(shí)際消費(fèi)該消息。消息選擇器是一些字符串,它們基于某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer 創(chuàng)建的一部分。
?
?消息選擇器的用法
? ? ? MessageConsumer是一個(gè)Session創(chuàng)建的對象,用來從Destination接收消息
? ? ? 關(guān)于消息選擇器
? ? ? MessageConsumer createConsumer( Destination destination, String messageSelector )
? ? ? MessageConsumer createConsumer( Destination destination, String messageSelector, boolean noLocal )
? ? ? 其中,messageSelector為消息選擇器;?
? ? ? noLocal標(biāo)志默認(rèn)為false,當(dāng)設(shè)置為true時(shí),限制消費(fèi)者只能接收和自己相同的連接(Connection)所發(fā)布的消息,此標(biāo)志只適用于主題,不適用于隊(duì)列。
? ? ? public final String SELECTOR="JMS_TYPE='MY_TAG1'" ;?
? ? ? 選擇器檢查傳入消息的JMS_TYPE的屬性,并確定這個(gè)屬性的值是否等于MY_TAG1;
? ? ? 如果相等,消息報(bào)消費(fèi);如果不相等,那么消息就會(huì)被忽略;
?
?
1、消息生產(chǎn)者:
package mqtest3; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產(chǎn)者 private MessageProducer messageProducer; public Producer() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設(shè)置自動(dòng)簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.messageProducer = this.session.createProducer(null); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } public void send1(/* String QueueName, Message message */) { try { Destination destination = this.session.createQueue("first"); MapMessage msg1 = this.session.createMapMessage(); msg1.setString("name", "張三"); msg1.setInt("age", 20); // 設(shè)置用于消息過濾器的條件 msg1.setStringProperty("name", "張三"); msg1.setIntProperty("age", 20); msg1.setStringProperty("color", "bule"); MapMessage msg2 = this.session.createMapMessage(); msg2.setString("name", "李四"); msg2.setInt("age", 25); // 設(shè)置用于消息過濾器的條件 msg2.setStringProperty("name", "李四"); msg2.setIntProperty("age", 25); msg2.setStringProperty("color", "white"); MapMessage msg3 = this.session.createMapMessage(); msg3.setString("name", "趙六"); msg3.setInt("age", 30); // 設(shè)置用于消息過濾器的條件 msg3.setStringProperty("name", "趙六"); msg3.setIntProperty("age", 30); msg3.setStringProperty("color", "black"); // 發(fā)送消息 this.messageProducer.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public void send2() { try { Destination destination = this.session.createQueue("first"); TextMessage message = this.session.createTextMessage("我是一個(gè)字符串"); message.setIntProperty("age", 25); // 發(fā)送消息 this.messageProducer.send(destination, message, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Producer producer = new Producer(); producer.send1(); // producer.send2(); } }?
2、消息消費(fèi)者:
package mqtest3; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Conmuser { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產(chǎn)者 private MessageConsumer messageConsumer; // 5、目的地址 private Destination destination; // 消息選擇器 public final String SELECTOR_1 = "age > 25"; public final String SELECTOR_2 = " age > 20 and color='black'"; public Conmuser() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設(shè)置自動(dòng)簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); // 在構(gòu)造消費(fèi)者的時(shí)候,指定了 消息選擇器 // 有選擇性的消費(fèi)消息 this.messageConsumer = this.session.createConsumer(destination, SELECTOR_1); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } // 用于監(jiān)聽消息隊(duì)列的消息 class MyLister implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage ret = (TextMessage) message; System.out.println("results;" + ret.getText()); } if (message instanceof MapMessage) { MapMessage ret = (MapMessage) message; System.out.println(ret.toString()); System.out.println(ret.getString("name")); System.out.println(ret.getInt("age")); } } catch (JMSException e) { throw new RuntimeException(e); } } } // 用于異步監(jiān)聽消息 public void receiver() { try { this.messageConsumer.setMessageListener(new MyLister()); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Conmuser conmuser = new Conmuser(); conmuser.receiver(); } }上面的demo是對MapMessage和TextMessage兩種消息的過濾條件的設(shè)置和消費(fèi),過濾條件的設(shè)置使在消息的屬性中設(shè)置,而消費(fèi)消息的時(shí)候直接是在session創(chuàng)建MessageConsumer時(shí)傳入的參數(shù)即過濾條件(過濾條件的寫法和SQL的寫法是很像的)
?
在寫過濾條件的時(shí)候要注意設(shè)置的是什么類型的條件即: int 、string 如果是int 則加引號(hào)而如果是String則要加哦!!!
?
需要注意的地方
? ? ?注意消息過濾器的過濾條件的設(shè)置
// 設(shè)置用于消息過濾器的條件 msg3.setStringProperty("name", "趙六"); msg3.setIntProperty("age", 30); msg3.setStringProperty("color", "black");?
消息過濾器的寫法(類似于SQL語句的寫法)
// 消息選擇器 public final String SELECTOR_1 = "age > 20"; public final String SELECTOR_2 = " age > 20 and color='bule'";總結(jié)
以上是生活随笔為你收集整理的JMS学习四(ActiveMQ消息过滤)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MPU6050开发 -- 初识
- 下一篇: 互联网晚报 | 3月28日 星期一 |