redis 队列_Redis系列5实现简单消息队列
任務異步化
打開瀏覽器,輸入地址,按下回車,打開了頁面。于是一個HTTP請求(request)就由客戶端發送到服務器,服務器處理請求,返回響應(response)內容。
我們每天都在瀏覽網頁,發送大大小小的請求給服務器。有時候,服務器接到了請求,會發現他也需要給另外的服務器發送請求,或者服務器也需要做另外一些事情,于是最初們發送的請求就被阻塞了,也就是要等待服務器完成其他的事情。
更多的時候,服務器做的額外事情,并不需要客戶端等待,這時候就可以把這些額外的事情異步去做。從事異步任務的工具有很多。主要原理還是處理通知消息,針對通知消息通常采取是隊列結構。生產和消費消息進行通信和業務實現。
? ? ? 基于內存的單線程數據庫,使Redis的線程安全性與性能極高。而Redis的雙向鏈表數據類型(List)天生就可作為消息隊列存儲消息.
在這里就不說消息隊列的等等一些優點。但是補充一下Redis的List類型的幾個命令,你可以指定將一個元素投送到列表的頭部(左邊)或者尾部(右邊),當然也可以指定從列表的頭部或尾部取出數據.
在項目中用到了redis作為緩存,再學習了ActiveMq之后想著用redis實現簡單的消息隊列,下面做記錄。
? Redis的列表類型鍵可以用來實現隊列,并且支持阻塞式讀取,可以很容易的實現一個高性能的優先隊列。同時在更高層面上,Redis還支持"發布/訂閱"的消息模式,可以基于此構建一個聊天系統。
一、redis的列表類型天生支持用作消息隊列。(類似于MQ的隊列模型--任何時候都可以消費,一條消息只能消費一次)
? list操作參考:https://www.cnblogs.com/qlqwjy/p/7789125.html
  ?在Redis中,List類型是按照插入順序排序的字符串鏈表。和數據結構中的普通鏈表一樣,我們可以在其頭部(left)和尾部(right)添加新的元素。在插入時,如果該鍵并不存在,Redis將為該鍵創建一個新的鏈表。與此相反,如果鏈表中所有的元素均被移除,那么該鍵也將會被從數據庫中刪除。List中可以包含的最大元素數量是4294967295。
????? 從元素插入和刪除的效率視角來看,如果我們是在鏈表的兩頭插入或刪除元素,這將會是非常高效的操作,即使鏈表中已經存儲了百萬條記錄,該操作也可以在常量時間內完成。然而需要說明的是,如果元素插入或刪除操作是作用于鏈表中間,那將會是非常低效的。相信對于有良好數據結構基礎的開發者而言,這一點并不難理解。(類似于java的ArrayList)
redis對list的操作命令中。L表示從左邊(頭部)開始插與彈出,R表示從右邊(尾部)開始插與彈出。
1.redis中簡單的操作list,簡單的在命令行操作實現隊列
(1)從左向右插入,從右向左彈出:
127.0.0.1:6379> lpush mylist a b c d(integer) 4127.0.0.1:6379> lrange mylist 0 -11) "d"2) "c"3) "b"4) "a"127.0.0.1:6379> rpop mylist"a"127.0.0.1:6379> rpop mylist"b"
執行完? ?lpush mylist a b c d? 之后數據結構如下:(滿足先進先出的隊列模式)
?執行完第一次:rpop mylist之后數據結構如下:
?(2)從右向左插入,從左向右彈出:
127.0.0.1:6379> rpush mylist2 a b c d(integer) 4127.0.0.1:6379> lrange mylist2 0 -11) "a"2) "b"3) "c"4) "d"127.0.0.1:6379> lpop mylist2"a"127.0.0.1:6379> lpop mylist2"b"
?執行完:rpush mylist2 a b c d之后的數據結構如下
第一次執行完? ?lpop mylist2? 之后數據結構如下:(滿足先進先出的隊列模式)
?2.JAVA程序實現消息隊列
redis.properties
redis.url=localhostredis.port=6379redis.maxIdle=30redis.minIdle=10redis.maxTotal=100redis.maxWait=10000獲取連接的工具類:
import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import java.io.IOException;import java.io.InputStream;import java.util.Properties;/** * @Author: cc * @Description * @Date: 21:32 2020/10/9 */public class JedisPoolUtils { private static JedisPool pool = null; static { //加載配置文件 InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties"); Properties pro = new Properties(); try { pro.load(in); } catch (IOException e) { e.printStackTrace(); } //獲得池子對象 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大閑置個數 poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大閑置個數 poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小閑置個數 poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大連接數 pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString())); } //獲得jedis資源的方法 public static Jedis getJedis() { return pool.getResource(); } public static void main(String[] args) { Jedis jedis = getJedis(); System.out.println(jedis); }} (1)消息生產者:(開啟5個線程生產消息)import redis.clients.jedis.Jedis;/** * @Author: cc * @Description?*?@Date:?21:29?2020/10/9 */public class MessageProducer extends Thread { public static final String MESSAGE_KEY = "message:queue"; private volatile int count; public void putMessage(String message) { Jedis jedis = JedisPoolUtils.getJedis(); Long size = jedis.lpush(MESSAGE_KEY, message); System.out.println(Thread.currentThread().getName() + " put message,size=" + size + ",count=" + count); count++; } @Override public synchronized void run() { for (int i = 0; i < 5; i++) { putMessage("message" + count); } } public static void main(String[] args) { MessageProducer messageProducer = new MessageProducer(); Thread t1 = new Thread(messageProducer, "thread1"); Thread t2 = new Thread(messageProducer, "thread2"); Thread t3 = new Thread(messageProducer, "thread3"); Thread t4 = new Thread(messageProducer, "thread4"); Thread t5 = new Thread(messageProducer, "thread5"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); }}?結果:(證明了redis是單線程操作,只能一個一個操作)
thread1 put message,size=1,count=0thread1 put message,size=2,count=1
thread1 put message,size=3,count=2
thread1 put message,size=4,count=3
thread1 put message,size=5,count=4
thread3 put message,size=6,count=5
thread3 put message,size=7,count=6
thread3 put message,size=8,count=7
thread3 put message,size=9,count=8
thread3 put message,size=10,count=9
thread4 put message,size=11,count=10
thread4 put message,size=12,count=11
thread4 put message,size=13,count=12
thread4 put message,size=14,count=13
thread4 put message,size=15,count=14
thread5 put message,size=16,count=15
?redis后臺查看:
127.0.0.1:6379> lrange message:queue 0 -11) "message24"
2) "message23"
3) "message22"
4) "message21"
5) "message20"
6) "message19"
7) "message18"
8) "message17"
9) "message16"
10) "message15"
11) "message14"
12) "message13"
13) "message12"
14) "message11"
15) "message10"
16) "message9"
17) "message8"
18) "message7"
19) "message6"
20) "message5"
21) "message4"
22) "message3"
23) "message2"
24) "message1"
25) "message0"
??(2)消息消費者:(開啟兩個線程消費消息)
import redis.clients.jedis.Jedis;/** * @Author: cc * @Description * @Date: 22:34 2020/10/9 */public class MessageConsumer implements Runnable { public static final String MESSAGE_KEY = "message:queue"; private volatile int count; public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); String message = jedis.rpop(MESSAGE_KEY); System.out.println(Thread.currentThread().getName() + " consumer message,message=" + message + ",count=" + count); count++; } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread6"); Thread t2 = new Thread(messageConsumer, "thread7"); t1.start(); t2.start(); }}結果:(滿足先進先出的規則)--雖然消息已經消費完了,但是仍然在不停的rpop,所以造成浪費
thread6 consumer message,message=message0,count=0thread6 consumer message,message=message1,count=1
thread6 consumer message,message=message2,count=2
thread6 consumer message,message=message3,count=3
thread7 consumer message,message=message4,count=4
thread6 consumer message,message=message5,count=5
thread7 consumer message,message=message6,count=6
thread6 consumer message,message=message7,count=7
thread7 consumer message,message=message8,count=8
thread6 consumer message,message=message9,count=9
thread7 consumer message,message=message10,count=10
thread6 consumer message,message=message11,count=11
thread7 consumer message,message=message12,count=12
thread6 consumer message,message=message13,count=13
thread7 consumer message,message=message14,count=14
thread6 consumer message,message=message15,count=15
thread7 consumer message,message=message16,count=16
thread6 consumer message,message=message17,count=16
thread7 consumer message,message=message18,count=18
thread6 consumer message,message=message19,count=19
thread7 consumer message,message=message20,count=20
thread6 consumer message,message=message21,count=20
thread7 consumer message,message=message22,count=22
thread6 consumer message,message=message23,count=22
thread7 consumer message,message=message24,count=24
thread6 consumer message,message=null,count=25
thread7 consumer message,message=null,count=26
thread6 consumer message,message=null,count=27
thread7 consumer message,message=null,count=28
thread6 consumer message,message=null,count=28
thread7 consumer message,message=null,count=30
thread6 consumer message,message=null,count=31
...
?但上述例子中消息消費者有一個問題存在,即需要不停的調用rpop方法查看List中是否有待處理消息。每調用一次都會發起一次連接,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,但這樣做有兩個問題:
? ? 1)、如果生產者速度大于消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內存空間。
? ? 2)、如果睡眠時間過長,這樣不能處理一些時效性的消息,睡眠時間過短,也會在連接上造成比較大的開銷。
補充:brpop和blpop實現阻塞讀取(重要)
也就是上面的操作需要一直調用rpop命令或者lpop命令才可以實現不停的監聽且消費消息。為了解決這一問題,redis提供了阻塞命令 brpop和blpop。下面以brpop命名為例進行試驗:
brpop命令可以接收多個鍵,其完整的命令格式為 BRPOP key [key ...] timeout,如:brpop key1 0。意義是同時檢測多個鍵,如果所有鍵都沒有元素則阻塞,如果其中一個有元素則從該鍵中彈出該元素(會按照key的順序進行讀取,可以實現具有優先級的隊列)。例如下面試驗:
開啟兩個客戶端,第一個客戶端中采用brpop阻塞讀取兩個鍵:
127.0.0.1:6379> brpop mylist1 mylist2 0第二個客戶端增加mylist1?:
127.0.0.1:6379> lpush mylist1 1 2(integer) 2
則在第一個客戶端顯示:
127.0.0.1:6379> brpop mylist1 mylist2 01) "mylist1"2) "1"(56.31s)
也就是brpop會阻塞隊列,并且每次也是彈出一個消息,如果沒有消息會阻塞。
如果多個鍵都有元素則按照從左到右讀取第一個鍵中的一個元素,例如我們現在queue1和queue2各自添加一個元素:
127.0.0.1:6379> lpush queue1 1 2(integer) 2127.0.0.1:6379> lpush queue2 3 4
(integer) 2
然后執行brpop命令:(會返回讀取的key和value,第一個是返回的key,第二個是value)
127.0.0.1:6379> brpop queue1 queue2 21) "queue1"2) "1"借此特性可以實現區分優先級的任務隊列。也就是brpop會按照key的順序依次讀取一個數據。
改造上面代碼實現阻塞讀取:
import redis.clients.jedis.Jedis;import java.util.List;/** * @Author: cc * @Description * @Date: 22:34 2020/10/9 */public class MessageConsumer implements Runnable { public static final String MESSAGE_KEY = "message:queue"; private volatile int count; private Jedis jedis = JedisPoolUtils.getJedis(); public void consumerMessage() { List brpop = jedis.brpop(0, MESSAGE_KEY);//0是timeout,返回的是一個集合,第一個是消息的key,第二個是消息的內容 System.out.println(brpop); } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread6"); Thread t2 = new Thread(messageConsumer, "thread7"); t1.start(); t2.start(); }}然后可以運行Customer,清空控制臺,可以看到程序沒有任何輸出,阻塞在了brpop這兒。然后在打開Redis的客戶端,輸入指令client list,可以查看當前的連接個數。
當啟動生產者生產消息之后,消費者會自動消費消息,而且消費者會阻塞直到有消息。
[message:queue, message0][message:queue, message1]
[message:queue, message2]
[message:queue, message3]
[message:queue, message4]
[message:queue, message5]
[message:queue, message6]
[message:queue, message7]
[message:queue, message8]
[message:queue, message9]
[message:queue, message10]
[message:queue, message11]
[message:queue, message12]
[message:queue, message13]
[message:queue, message14]
[message:queue, message15]
[message:queue, message16]
[message:queue, message17]
[message:queue, message18]
[message:queue, message19]
[message:queue, message20]
[message:queue, message21]
[message:queue, message22]
[message:queue, message23]
[message:queue, message24]
二、發布/訂閱模式(類似于MQ的主題模式-只能消費訂閱之后發布的消息,一個消息可以被多個訂閱者消費)
1.客戶端發布/訂閱
1.1? ?普通的發布/訂閱
? 除了實現任務隊列外,redis還提供了一組命令可以讓開發者實現"發布/訂閱"(publish/subscribe)模式。"發布/訂閱"模式同樣可以實現進程間的消息傳遞,其原理如下:
"發布/訂閱"模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或者多個頻道(channel),而發布者可以向指定的頻道(channel)發送消息,所有訂閱此頻道的訂閱者都會收到此消息。
(1)發布消息
發布者發布消息的命令是? publish,用法是 publish channel message,如向 channel1.1說一聲hi
127.0.0.1:6379> publish channel:1 hi(integer) 0
這樣消息就發出去了。返回值表示接收這條消息的訂閱者數量。發出去的消息不會被持久化,也就是有客戶端訂閱channel:1后只能接收到后續發布到該頻道的消息,之前的就接收不到了。
(2)訂閱頻道
訂閱頻道的命令是 subscribe,可以同時訂閱多個頻道,用法是 subscribe channel1 [channel2 ...],例如新開一個客戶端訂閱上面頻道:(不會收到消息,因為不會收到訂閱之前就發布到該頻道的消息)
127.0.0.1:6379> subscribe channel:1Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "channel:1"3) (integer) 1
執行上面命令客戶端會進入訂閱狀態,處于此狀態下客戶端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe這四個屬于"發布/訂閱"之外的命令,否則會報錯。
進入訂閱狀態后客戶端可能收到3種類型的回復。每種類型的回復都包含3個值,第一個值是消息的類型,根據消類型的不同,第二個和第三個參數的含義可能不同。
消息類型的取值可能是以下3個:
(1)subscribe。表示訂閱成功的反饋信息。第二個值是訂閱成功的頻道名稱,第三個是當前客戶端訂閱的頻道數量。
(2)message。表示接收到的消息,第二個值表示產生消息的頻道名稱,第三個值是消息的內容。
(3)unsubscribe。表示成功取消訂閱某個頻道。第二個值是對應的頻道名稱,第三個值是當前客戶端訂閱的頻道數量,當此值為0時客戶端會退出訂閱狀態,之后就可以執行其他非"發布/訂閱"模式的命令了。
(3)第一個客戶端重新向channel:1發送一條消息
127.0.0.1:6379> publish channel:1 hi(integer) 1
返回值表示訂閱此頻道的數量
c
上面訂閱的客戶端:
127.0.0.1:6379> subscribe channel:1Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "channel:1"3) (integer) 11) "message"
2) "channel:1"
3) "hi"
紅字部分表示成功的收到消息(依次是消息類型,頻道,消息內容)
?
1.2? ?按照規則發布/訂閱
除了可以使用subscribe命令訂閱指定的頻道外,還可以使用psubscribe命令訂閱指定的規則。規則支持通配符格式。命令格式為? ? ? psubscribe pattern [pattern ...]訂閱多個模式的頻道。
通配符中?表示1個占位符,*表示任意個占位符(包括0),?*表示1個以上占位符。
例如:
(1)訂閱者訂閱三個通配符頻道
127.0.0.1:6379> psubscribe c? b* d?*Reading messages... (press Ctrl-C to quit)1) "psubscribe"2) "c?"3) (integer) 11) "psubscribe"2) "b*"3) (integer) 21) "psubscribe"2) "d?*"3) (integer) 3(2)新開一個客戶端發送到指定頻道
C:\Users\liqiang>redis-cli127.0.0.1:6379> publish c m1(integer) 0127.0.0.1:6379> publish c1 m1(integer) 1127.0.0.1:6379> publish c11 m1(integer) 0127.0.0.1:6379> publish b m1(integer) 1127.0.0.1:6379> publish b1 m1(integer) 1127.0.0.1:6379> publish b11 m1(integer) 1127.0.0.1:6379> publish d m1(integer) 0127.0.0.1:6379> publish d1 m1(integer) 1127.0.0.1:6379> publish d11 m1(integer) 1上面返回值為1表示被訂閱者所接受,可以匹配上面的通配符。
訂閱者客戶端:
127.0.0.1:6379> psubscribe c? b* d?*Reading messages... (press Ctrl-C to quit)1) "psubscribe"2) "c?"3) (integer) 11) "psubscribe"2) "b*"3) (integer) 21) "psubscribe"2) "d?*"3) (integer) 31) "pmessage"2) "c?"3) "c1"4) "m1"1) "pmessage"2) "b*"3) "b"4) "m1"1) "pmessage"2) "b*"3) "b1"4) "m1"1) "pmessage"2) "b*"3) "b11"4) "m1"1) "pmessage"2) "d?*"3) "d1"4) "m1"1) "pmessage"2) "d?*"3) "d11"4) "m1"注意:
(1)使用psubscribe命令可以重復訂閱同一個頻道,如客戶端執行了psubscribe c? c?*。這時向c1發布消息客戶端會接受到兩條消息,而同時publish命令的返回值是2而不是。.同樣的,如果有另一個客戶端執行了subscribe c1 和psubscribe c?*的話,向c1發送一條消息該客戶端也會收到兩條消息(但是是兩種類型:message和pmessage),同時publish命令也返回2.
(2)punsubscribe命令可以退訂指定的規則,用法是: punsubscribe [pattern [pattern ...]],如果沒有參數則會退訂所有規則。
(3)使用punsubscribe只能退訂通過psubscribe命令訂閱的規則,不會影響直接通過subscribe命令訂閱的頻道;同樣unsubscribe命令也不會影響通過psubscribe命令訂閱的規則。另外需要注意punsubscribe命令退訂某個規則時不會將其中的通配符展開,而是進行嚴格的字符串匹配,所以punsubscribe * 無法退訂c*規則,而是必須使用punsubscribe c*才可以退訂。
2.Java程序實現發布者訂閱者模式
1.生產者
import redis.clients.jedis.Jedis;/** * @Author: cc * @Description * @Date: 21:29 2020/10/9 */public class MessageProducer extends Thread { public static final String CHANNEL_KEY = "channel:1"; private volatile int count; public void putMessage(String message) { Jedis jedis = JedisPoolUtils.getJedis(); Long publish = jedis.publish(CHANNEL_KEY, message);//返回訂閱者數量 System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish); count++; } @Override public synchronized void run() { for (int i = 0; i < 5; i++) { putMessage("message" + count); } } public static void main(String[] args) { MessageProducer messageProducer = new MessageProducer(); Thread t1 = new Thread(messageProducer, "thread1"); Thread t2 = new Thread(messageProducer, "thread2"); Thread t3 = new Thread(messageProducer, "thread3"); Thread t4 = new Thread(messageProducer, "thread4"); Thread t5 = new Thread(messageProducer, "thread5"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); }}結果:
thread1 put message,count=0,subscriberNum=0
thread1 put message,count=1,subscriberNum=0
thread1 put message,count=2,subscriberNum=0
thread1 put message,count=3,subscriberNum=0
thread1 put message,count=4,subscriberNum=0
thread4 put message,count=5,subscriberNum=0
thread4 put message,count=6,subscriberNum=0
thread4 put message,count=7,subscriberNum=0
thread4 put message,count=8,subscriberNum=0
thread4 put message,count=9,subscriberNum=0
thread5 put message,count=10,subscriberNum=0
thread5 put message,count=11,subscriberNum=0
thread5 put message,count=12,subscriberNum=0
thread5 put message,count=13,subscriberNum=0
thread5 put message,count=14,subscriberNum=0
thread2 put message,count=15,subscriberNum=0
thread2 put message,count=16,subscriberNum=0
thread2 put message,count=17,subscriberNum=0
thread2 put message,count=18,subscriberNum=0
thread2 put message,count=19,subscriberNum=0
thread3 put message,count=20,subscriberNum=0
thread3 put message,count=21,subscriberNum=0
thread3 put message,count=22,subscriberNum=0
thread3 put message,count=23,subscriberNum=0
thread3 put message,count=24,subscriberNum=0
2.消費者
(1)subscribe實現訂閱消費消息(開啟兩個線程訂閱消息)
import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * @Author: cc * @Description * @Date: 22:34 2020/10/9 */public class MessageConsumer implements Runnable { public static final String CHANNEL_KEY = "channel:1";//頻道 public static final String EXIT_COMMAND = "exit";//結束程序的消息 private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//處理接收消息 public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一個參數是處理接收消息,第二個參數是訂閱的消息頻道 } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread5"); Thread t2 = new Thread(messageConsumer, "thread6"); t1.start(); t2.start(); }}/** * 繼承JedisPubSub,重寫接收消息的方法 */class MyJedisPubSub extends JedisPubSub { @Override /** JedisPubSub類是一個沒有抽象方法的抽象類,里面方法都是一些空實現 * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法 * 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數為byte[] **/ public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message); //接收到exit消息后退出 if (MessageConsumer.EXIT_COMMAND.equals(message)) { System.exit(0); } }}我們再次啟動生產者生產消息,生產者控制臺:
thread5 put message,count=0,subscriberNum=2thread5 put message,count=1,subscriberNum=2
thread5 put message,count=2,subscriberNum=2
thread5 put message,count=3,subscriberNum=2
thread5 put message,count=4,subscriberNum=2
thread3 put message,count=5,subscriberNum=2
thread3 put message,count=6,subscriberNum=2
thread3 put message,count=7,subscriberNum=2
thread3 put message,count=8,subscriberNum=2
thread3 put message,count=9,subscriberNum=2
thread2 put message,count=10,subscriberNum=2
thread2 put message,count=11,subscriberNum=2
thread2 put message,count=12,subscriberNum=2
thread2 put message,count=13,subscriberNum=2
thread2 put message,count=14,subscriberNum=2
thread4 put message,count=15,subscriberNum=2
thread4 put message,count=16,subscriberNum=2
thread4 put message,count=17,subscriberNum=2
thread4 put message,count=18,subscriberNum=2
thread4 put message,count=19,subscriberNum=2
thread1 put message,count=20,subscriberNum=2
thread1 put message,count=21,subscriberNum=2
thread1 put message,count=22,subscriberNum=2
thread1 put message,count=23,subscriberNum=2
thread1 put message,count=24,subscriberNum=2
Process finished with exit code 0
消費者控制臺:
thread6-接收到消息:channel=channel:1,message=message0thread5-接收到消息:channel=channel:1,message=message0
thread5-接收到消息:channel=channel:1,message=message1
thread6-接收到消息:channel=channel:1,message=message1
thread5-接收到消息:channel=channel:1,message=message2
thread6-接收到消息:channel=channel:1,message=message2
thread5-接收到消息:channel=channel:1,message=message3
thread6-接收到消息:channel=channel:1,message=message3
thread5-接收到消息:channel=channel:1,message=message4
thread6-接收到消息:channel=channel:1,message=message4
thread5-接收到消息:channel=channel:1,message=message5
thread6-接收到消息:channel=channel:1,message=message5
thread5-接收到消息:channel=channel:1,message=message6
thread6-接收到消息:channel=channel:1,message=message6
thread5-接收到消息:channel=channel:1,message=message7
thread6-接收到消息:channel=channel:1,message=message7
thread5-接收到消息:channel=channel:1,message=message8
thread6-接收到消息:channel=channel:1,message=message8
thread5-接收到消息:channel=channel:1,message=message9
thread6-接收到消息:channel=channel:1,message=message9
thread5-接收到消息:channel=channel:1,message=message10
thread6-接收到消息:channel=channel:1,message=message10
thread5-接收到消息:channel=channel:1,message=message11
thread6-接收到消息:channel=channel:1,message=message11
thread5-接收到消息:channel=channel:1,message=message12
thread6-接收到消息:channel=channel:1,message=message12
thread5-接收到消息:channel=channel:1,message=message13
thread6-接收到消息:channel=channel:1,message=message13
thread5-接收到消息:channel=channel:1,message=message14
thread6-接收到消息:channel=channel:1,message=message14
thread5-接收到消息:channel=channel:1,message=message15
thread6-接收到消息:channel=channel:1,message=message15
thread5-接收到消息:channel=channel:1,message=message16
thread6-接收到消息:channel=channel:1,message=message16
thread5-接收到消息:channel=channel:1,message=message17
thread6-接收到消息:channel=channel:1,message=message17
thread5-接收到消息:channel=channel:1,message=message18
thread6-接收到消息:channel=channel:1,message=message18
thread5-接收到消息:channel=channel:1,message=message19
thread6-接收到消息:channel=channel:1,message=message19
thread5-接收到消息:channel=channel:1,message=message20
thread6-接收到消息:channel=channel:1,message=message20
thread5-接收到消息:channel=channel:1,message=message21
thread6-接收到消息:channel=channel:1,message=message21
thread5-接收到消息:channel=channel:1,message=message22
thread6-接收到消息:channel=channel:1,message=message22
thread5-接收到消息:channel=channel:1,message=message23
thread6-接收到消息:channel=channel:1,message=message23
thread5-接收到消息:channel=channel:1,message=message24
thread6-接收到消息:channel=channel:1,message=message24
(2)psubscribe實現訂閱消費消息(開啟兩個線程訂閱消息)
import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * @Author: cc * @Description * @Date: 22:34 2020/10/9 */public class MessageConsumer implements Runnable { public static final String CHANNEL_KEY = "channel*";//頻道 public static final String EXIT_COMMAND = "exit";//結束程序的消息 private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//處理接收消息 public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); jedis.psubscribe(myJedisPubSub, CHANNEL_KEY);//第一個參數是處理接收消息,第二個參數是訂閱的消息頻道 } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread5"); Thread t2 = new Thread(messageConsumer, "thread6"); t1.start(); t2.start(); }}/** * 繼承JedisPubSub,重寫接收消息的方法 */class MyJedisPubSub extends JedisPubSub { @Override public void onPMessage(String pattern, String channel, String message) { System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message); //接收到exit消息后退出 if (MessageConsumer.EXIT_COMMAND.equals(message)) { System.exit(0); } }}重寫JedisPubSub 的onPMessage方法即可
啟動生產者生產消息之后查看消費者控制臺:
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message0thread5-接收到消息:pattern=channel*,channel=channel:1,message=message0
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message1
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message1
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message2
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message2
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message3
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message3
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message4
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message4
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message5
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message5
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message6
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message6
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message7
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message7
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message8
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message8
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message9
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message9
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message10
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message10
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message11
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message11
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message12
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message12
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message13
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message13
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message14
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message14
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message15
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message15
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message16
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message16
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message17
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message17
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message18
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message18
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message19
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message19
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message20
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message20
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message21
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message21
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message22
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message22
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message23
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message23
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message24
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message24
補充:訂閱的時候subscribe()和psubscribe()的第二個參數支持可變參數,也就是可以實現訂閱多個頻道。
至此實現了兩種方式的消息隊列:
redis自帶的list類型(lpush和rpop或者brpop,rpush和lpop或者blpop)---blpop和brpop是阻塞讀取。
"發布/訂閱"模式(publish channel message 和 subscribe channel [channel ...] 或者 psubscribe pattern [pattern ...] 通配符訂閱多個頻道)
補充:
1.發布訂閱執行訂閱之后該線程處于阻塞狀態,線程不會終止,如果終止線程需要退訂,需要調用JedisPubSub的unsubscribe()方法
例如:
package plainTest;import cn.xm.redisChat.util.JedisPoolUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * @Author: cc * @Description * @Date: 23:36 2020/10/13 */public class Test111 { public static void main(String[] args) { Jedis jedis = JedisPoolUtils.getJedis(); System.out.println("訂閱前"); jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { super.onMessage(channel, message); } }, "c1"); System.out.println("訂閱后"); }}結果只會打印訂閱前,而且線程不會終止。
為了使線程可以停止,必須退訂,而且退訂只能調用??JedisPubSub.unsubscribe()方法,例如:收到quit消息之后會退訂,線程會回到主線程打印訂閱后。
package plainTest;import cn.xm.redisChat.util.JedisPoolUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * @Author: cc * @Description * @Date: 23:36 2020/10/13 */public class Test111 { public static void main(String[] args) { Jedis jedis = JedisPoolUtils.getJedis(); System.out.println("訂閱前"); jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { if("quit".equals(message)){ unsubscribe("c1"); } System.out.println(message); } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); } }, "c1"); System.out.println("訂閱后"); }}2.BRPOP:當給定列表內沒有任何元素可供彈出的時候,連接將被BRPOP命令阻塞,直到等待超時或發現可彈出元素為止。(每次只彈出一個元素,當沒有元素的時候處于阻塞,當彈出一個元素之后就會解除阻塞)
沒有元素的時候只會打印brpop之前。
package plainTest;import cn.xm.redisChat.util.JedisPoolUtils;import redis.clients.jedis.Jedis;import java.util.List;/** * @Author: cc * @Description * @Date: 23:36 2020/10/13 */public class Test111 { public static void main(String[] args) { Jedis jedis = JedisPoolUtils.getJedis(); System.out.println("brpop之前"); List<String> messages = jedis.brpop(0,"list1"); System.out.println(messages); System.out.println("brpop之后"); }}如果覺得文章不錯,歡迎點個在看
總結
以上是生活随笔為你收集整理的redis 队列_Redis系列5实现简单消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 酷冷至尊 GA241 游戏显示器发布:2
- 下一篇: 特斯拉种族歧视案再审:员工赔偿金从 1.
