RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 過(guò)濾消息概述
- 基本語(yǔ)法
- 使用限制
- 啟用配置 (重要 )
- 常見(jiàn)錯(cuò)誤:The broker does not support consumer to filter message by SQL92
- 示例
- 生產(chǎn)者
- 消費(fèi)者
過(guò)濾消息概述
大部分情況下 ,我們都可以通過(guò)TAG來(lái)選擇我們想要獲取的消息,如下
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");消費(fèi)者將接收包含TAGA或TAGB或TAGC的消息。但是限制是一個(gè)消息只能有一個(gè)標(biāo)簽,這對(duì)于復(fù)雜的場(chǎng)景可能不起作用。
在這種情況下,可以使用SQL表達(dá)式篩選消息。SQL特性可以通過(guò)發(fā)送消息時(shí)的屬性來(lái)進(jìn)行計(jì)算。
在RocketMQ定義的語(yǔ)法下,可以實(shí)現(xiàn)一些簡(jiǎn)單的邏輯。
舉個(gè)例子
基本語(yǔ)法
使用限制
只有使用push模式的消費(fèi)者才能用使用SQL92標(biāo)準(zhǔn)的sql語(yǔ)句 ,
接口如下
public void subscribe(final String topic, final MessageSelector messageSelector)啟用配置 (重要 )
使用Filter功能,需要在啟動(dòng)配置文件當(dāng)中配置以下選項(xiàng)
enablePropertyFilter=true常見(jiàn)錯(cuò)誤:The broker does not support consumer to filter message by SQL92
配置文件中增加如下配置
enablePropertyFilter=true示例
生產(chǎn)者
package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:30* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class FilterProducer {/**** TAG-FILTER-1000 ---> 布隆過(guò)濾器* 過(guò)濾掉的那些消息。直接就跳過(guò)了么。下次就不會(huì)繼續(xù)過(guò)濾這些了。是么。* @param args* @throws Exception*/public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 3; i++) {Message msg = new Message("TopicFilter","TAG-FILTER",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. 生產(chǎn)者設(shè)置屬性,消費(fèi)者端通過(guò)Tag+該屬性定制消息msg.putUserProperty("a", String.valueOf(i));if (i % 2 == 0) {msg.putUserProperty("b", "artisan");} else {msg.putUserProperty("b", "smart artisan");}producer.send(msg);}producer.shutdown();}}消費(fèi)者
package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:45* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");/*** 注冊(cè)中心*/consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 訂閱主題* 一種資源去換取另外一種資源*/consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));/*** 注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)主題消息*/consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId() + ", content:"+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Filter Consumer Started.%n");} }日志:
Filter Consumer Started. consumeThread=ConsumeMessageThread_1, queueId=0, content:Hello RocketMQ 0 consumeThread=ConsumeMessageThread_2, queueId=2, content:Hello RocketMQ 2可以看到,我們雖然發(fā)了 3條消息 ,但是只獲取了我們期望的2條消息,可見(jiàn)過(guò)濾起了作用。
總結(jié)
以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RocketMQ-初体验RocketMQ
- 下一篇: RocketMQ-初体验RocketMQ