Redis-13Redis发布订阅
文章目錄
- 概述
- 消息多播
- PubSub發(fā)布者訂閱者模型
- 客戶端操作
- Spring配置發(fā)布訂閱模式
- pubsub不足之處
- 代碼
概述
當(dāng)使用銀行卡消費(fèi)的時(shí)候,銀行往往會(huì)通過(guò)微信、短信或郵件通知用戶這筆交易的信
息,這便是一種發(fā)布訂閱模式, 1這里的發(fā)布是交易信息的發(fā)布,訂閱則是各個(gè)渠道。這在實(shí)際工作中十分常用, Redis 支持這樣的一個(gè)模式。
Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。觀察者模式就是這個(gè)模式的典型應(yīng)用。
Redis 客戶端可以訂閱任意數(shù)量的頻道。
下圖展示了頻道 channel1 , 以及訂閱這個(gè)頻道的三個(gè)客戶端 —— client2 、 client5 和 client1 之間的關(guān)系:
當(dāng)有新消息通過(guò) PUBLISH 命令發(fā)送給頻道 channel1 時(shí), 這個(gè)消息就會(huì)被發(fā)送給訂閱它的三個(gè)客戶端:
消息多播
消息多播允許生產(chǎn)者生產(chǎn)一次消息,中間件負(fù)責(zé)將消息復(fù)制到多個(gè)消息隊(duì)列,每個(gè)消息隊(duì)列由相應(yīng)的消費(fèi)組進(jìn)行消費(fèi)。
它是分布式系統(tǒng)常用的一種解耦方式,用于將多個(gè)消費(fèi)組的邏輯進(jìn)行拆分。
支持了消息多播,多個(gè)消費(fèi)組的邏輯就可以放到不同的子系統(tǒng)中。
如果是普通的消息隊(duì)列,就得將多個(gè)不同的消費(fèi)組邏輯串接起來(lái)放在一個(gè)子系統(tǒng)中,進(jìn)行連續(xù)消費(fèi)。
PubSub發(fā)布者訂閱者模型
為了支持消息多播,Redis單獨(dú)使用了一個(gè)模塊來(lái)支持消息多播,這個(gè)模塊的名字叫著 PubSub,也就是 PublisherSubscriber,發(fā)布者訂閱者模型。
客戶端操作
首先來(lái)注冊(cè)一個(gè)訂閱的客戶端 , 這個(gè)時(shí)候使用 SUBSCRIBE命令 。
比如監(jiān)昕一個(gè)叫作 talk 的渠道 , 這個(gè)時(shí)候我們需要先打開(kāi)一個(gè)客戶端 ,這里記為客戶
端1 ,然后輸入命令
這個(gè)時(shí)候客戶端 1 就會(huì)訂閱了一個(gè)叫作 talk渠道的消息了
打開(kāi)另外一個(gè)客戶端 ,記為客戶端 2訂閱 talk渠道的消息
127.0.0.1:6379> SUBSCRIBE talk Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "talk" 3) (integer) 1最后打開(kāi)另外一個(gè)客戶端,發(fā)布消息給這兩個(gè)訂閱者
127.0.0.1:6379> PUBLISH talk "redis world !!!" (integer) 2 127.0.0.1:6379>觀察客戶端 1 和客戶端2 ,就可以發(fā)現(xiàn)已經(jīng)收到了消息 , 井有對(duì)應(yīng)的信息打印出來(lái)。
Spring配置發(fā)布訂閱模式
首先提供接收消息的類(lèi) , 它將實(shí)現(xiàn) org.springframework.data.redis.connection.MessageListener 接口, 并實(shí)現(xiàn)接口定義的方法 public void onMessage(Message message, byte[] pattern)
package com.artisan.redis.publish;import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate;public class RedisMessageListener implements MessageListener {private RedisTemplate redisTemplate;public RedisTemplate getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉(zhuǎn)換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.out.println("RedisMessageListener:" + msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉(zhuǎn)換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.out.println("RedisMessageListener:" + channelStr);// 渠道名稱(chēng)轉(zhuǎn)換String bytesStr = new String(bytes);System.out.println("RedisMessageListener:" + bytesStr);}} package com.artisan.redis.publish;import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate;public class RedisMessageListener2 implements MessageListener {private RedisTemplate redisTemplate;public RedisTemplate getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉(zhuǎn)換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.out.println("RedisMessageListener2:" + msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉(zhuǎn)換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.out.println("RedisMessageListener2:" + channelStr);// 渠道名稱(chēng)轉(zhuǎn)換String bytesStr = new String(bytes);System.out.println("RedisMessageListener2:" + bytesStr);}}為了在 Spring 中使用這兩個(gè)監(jiān)聽(tīng)類(lèi),需要對(duì)其進(jìn)行配置。這樣就在 Spring 上下文中定義了監(jiān)昕類(lèi)。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:property-placeholder location="classpath:redis/redis.properties" /><!--2,注意新版本2.3以后,JedisPoolConfig的property name,不是maxActive而是maxTotal,而且沒(méi)有maxWait屬性,建議看一下Jedis源碼或百度。 --><!-- redis連接池配置 --><bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"><!--最大空閑數(shù) --><property name="maxIdle" value="${redis.maxIdle}" /><!--連接池的最大數(shù)據(jù)庫(kù)連接數(shù) --><property name="maxTotal" value="${redis.maxTotal}" /><!--最大建立連接等待時(shí)間 --><property name="maxWaitMillis" value="${redis.maxWaitMillis}" /><!--逐出連接的最小空閑時(shí)間 默認(rèn)1800000毫秒(30分鐘) --><property name="minEvictableIdleTimeMillis" value="${redis.minEvictableIdleTimeMillis}" /><!--每次逐出檢查時(shí) 逐出的最大數(shù)目 如果為負(fù)數(shù)就是 : 1/abs(n), 默認(rèn)3 --><property name="numTestsPerEvictionRun" value="${redis.numTestsPerEvictionRun}" /><!--逐出掃描的時(shí)間間隔(毫秒) 如果為負(fù)數(shù),則不運(yùn)行逐出線程, 默認(rèn)-1 --><property name="timeBetweenEvictionRunsMillis" value="${redis.timeBetweenEvictionRunsMillis}" /><property name="testOnBorrow" value="true"></property><property name="testOnReturn" value="true"></property><property name="testWhileIdle" value="true"></property></bean><!--redis連接工廠 --><bean id="jedisConnectionFactory"class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"destroy-method="destroy"><property name="poolConfig" ref="jedisPoolConfig"></property><!--IP地址 --><property name="hostName" value="${redis.host.ip}"></property><!--端口號(hào) --><property name="port" value="${redis.port}"></property><!--如果Redis設(shè)置有密碼 --><property name="password" value="${redis.password}" /> <!--客戶端超時(shí)時(shí)間單位是毫秒 --><property name="timeout" value="${redis.timeout}"></property><property name="usePool" value="true" /><!--<property name="database" value="0" /> --></bean><!-- 鍵值序列化器設(shè)置為String 類(lèi)型 --><bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/><!-- redis template definition --><bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"p:connection-factory-ref="jedisConnectionFactory"p:keySerializer-ref="stringRedisSerializer"p:valueSerializer-ref="stringRedisSerializer"></bean><!-- 自定義 發(fā)布訂閱監(jiān)聽(tīng)類(lèi) --><bean id="redisMessageListener" class="com.artisan.redis.publish.RedisMessageListener"p:redisTemplate-ref="redisTemplate"/><bean id="redisMessageListener2" class="com.artisan.redis.publish.RedisMessageListener2"p:redisTemplate-ref="redisTemplate"/> <!-- 監(jiān)聽(tīng)容器 --> <bean id="topicContainer"class="org.springframework.data.redis.listener.RedisMessageListenerContainer"destroy-method="destroy"><!--Redis 連接工廠 --><property name="connectionFactory" ref="jedisConnectionFactory"></property><!-- 連接池,這里只要線程池生存 , 才能繼續(xù)監(jiān)昕 --><property name="taskExecutor"><beanclass="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="3"></property></bean></property><!-- 消息監(jiān)聽(tīng) Map --><property name="messageListeners"><map><!--一配置監(jiān)聽(tīng)者, key-ref 和 bean id 定義一致 --><entry key-ref="redisMessageListener"><!--監(jiān)聽(tīng)類(lèi) --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="talk" /></bean></entry><entry key-ref="redisMessageListener2"><!--監(jiān)聽(tīng)類(lèi) --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="talk" /></bean></entry></map></property></bean> </beans>有了監(jiān)聽(tīng)類(lèi)還不能進(jìn)行測(cè)試。為了進(jìn)行測(cè)試 , 要給一個(gè)監(jiān)昕容器 , 在 Spring 中己有類(lèi)org.springframework.data . redi s. li stener.RedisMessageListenerContainer。它可 以用于監(jiān)聽(tīng) Redis的發(fā)布訂閱消息,上面配置的topicContainer就是為了實(shí)現(xiàn)這個(gè)功能。
這里配置了線程池,這個(gè)線程池將會(huì)持續(xù)的生存 以等待消息傳入 , 而這里配置了容器用id 為 redisMessageListener 和 redisMessageListener2的 Bean 進(jìn)行對(duì)渠道 talk的監(jiān)聽(tīng) 。當(dāng)消息通過(guò)渠道 talk發(fā)送的時(shí)候,就會(huì)使用 id 為 redisMessageListener和redisMessageListener2 的 Bean 進(jìn)行處理消息。
測(cè)試類(lèi)
package com.artisan.redis.publish;import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.data.redis.core.RedisTemplate;public class PublishSubscribeTest {public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/spring-redis-publish.xml");RedisTemplate redisTemplate = ctx.getBean(RedisTemplate.class);String channel = "talk";redisTemplate.convertAndSend(channel, "artisan-talk");} }convertAndSend 方法就是向渠道 talk發(fā)送消息的, 當(dāng)發(fā)送后對(duì)應(yīng)的監(jiān)聽(tīng)者就能監(jiān)聽(tīng)到消息了。運(yùn)行它,后臺(tái)就會(huì)打出對(duì)應(yīng)的消息:
INFO : org.springframework.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@73a8dfcc: startup date [Thu Sep 27 23:55:12 CST 2018]; root of context hierarchy INFO : org.springframework.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [spring/spring-redis-publish.xml] INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 RedisMessageListener:artisan-talk RedisMessageListener2:artisan-talk RedisMessageListener2:talk RedisMessageListener:talk RedisMessageListener:talk RedisMessageListener2:talk客戶端中肯定也有對(duì)應(yīng)的輸出,如果打開(kāi)了客戶端的話
pubsub不足之處
PubSub 的生產(chǎn)者傳遞過(guò)來(lái)一個(gè)消息,Redis 會(huì)直接找到相應(yīng)的消費(fèi)者傳遞過(guò)去。如果一個(gè)消費(fèi)者都沒(méi)有,那么消息直接丟棄。
如果開(kāi)始有三個(gè)消費(fèi)者,一個(gè)消費(fèi)者突然掛掉了,生產(chǎn)者會(huì)繼續(xù)發(fā)送消息,另外兩個(gè)消費(fèi)者可以持續(xù)收到消息。但是掛掉的消費(fèi)者重新連上的時(shí)候,這斷連期間生產(chǎn)者發(fā)送的消息,對(duì)于這個(gè)消費(fèi)者來(lái)說(shuō)就是徹底丟失了。
如果 Redis 停機(jī)重啟,PubSub 的消息是不會(huì)持久化的,畢竟 Redis 宕機(jī)就相當(dāng)于一個(gè)消費(fèi)者都沒(méi)有,所有的消息直接被丟棄。
正是因?yàn)?PubSub 有這些缺點(diǎn),它幾乎找不到合適的應(yīng)用場(chǎng)景。Redis5.0 新增了 Stream 數(shù)據(jù)結(jié)構(gòu),這個(gè)功能給 Redis 帶來(lái)了持久化消息隊(duì)列,從此 PubSub 可以消失了。
代碼
代碼托管到了 https://github.com/yangshangwei/redis_learn
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專(zhuān)家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Redis-13Redis发布订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Redis-12Redis 流水线( p
- 下一篇: Redis-15Redis基础配置文件