Java实现redis消息队列发布/订阅模式
生活随笔
收集整理的這篇文章主要介紹了
Java实现redis消息队列发布/订阅模式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
最近在一個老項目中需要用消息隊列,本來想著用卡夫卡,但是試了幾個版本之后發現jdk和卡夫卡版本一直對不上,最后選擇用redis來實現消息隊列的發布/訂閱模式。感謝這位大佬的博客給了我很多的幫助,https://www.cnblogs.com/qlqwjy/p/9763754.html再次感謝這位大佬。下面我們就看看我是怎么來實現的。
直接上代碼
redis.properties
注意:url此處為了隱私改為了localhost,大家可以修改為自己的ip,如果你的redis加密了要記得在配置文件中加上redis.auth=password password是你自己設定的密碼。
RedisUtil.java
import java.io.IOException; import java.io.InputStream; import java.util.Properties;import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;public class RedisUtil {private static JedisPool jedisPool = null;private static Properties pro = new Properties();//加載配置文件static{InputStream in = RedisUtil.class.getClassLoader().getResourceAsStream("redis.properties");try {pro.load(in);} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}//獲得池子對象JedisPoolConfig poolConfig = new JedisPoolConfig();//最大閑置個數poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大連接數 // poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString())); 2.0.0版本JedisPoolConfig沒有setMaxTotal方法poolConfig.setMaxActive(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大等待時間poolConfig.setMaxWait(Integer.parseInt(pro.get("redis.maxWait").toString()));//最小閑置個數poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));jedisPool = new JedisPool(poolConfig,pro.getProperty("redis.url"),Integer.parseInt(pro.get("redis.port").toString()));}public static Jedis getJedis(){Jedis jedis = jedisPool.getResource();jedis.auth(pro.getProperty("redis.auth"));return jedis;}//釋放資源池public static void returnSource(final Jedis jedis){if(jedis != null){jedisPool.returnResource(jedis);}}public static void main(String[] args) {Jedis jedis = getJedis();System.out.println(jedis);}生產者代碼 MessageProducer.java
import redis.clients.jedis.Jedis;public class MessageProducer implements Runnable {private static final String CHANNEL_KEY = "channel:1";private volatile int count;public void putMessage(String message){Jedis jedis = RedisUtil.getJedis();Long publish = jedis.publish(CHANNEL_KEY, message);//返回訂閱者數量System.out.println("------------------------"+Runnable.class.getName() + ",put message:"+message+",count=" + count +",subscriberNum=" + publish);count++;RedisUtil.returnSource(jedis);//釋放redis資源池}@Overridepublic synchronized void run() {for(int i = 0; i<2;i++){putMessage("message" + count);}}public static void main(String[] args) {MessageProducer mp = new MessageProducer();Thread t1 = new Thread(mp,"thread1");t1.start();}消費者代碼 MessageConsumer.java
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub;public class MessageConsumer implements Runnable {public static final String CHANNEL_KEY = "channel:1";//頻道public static final String EXIT_COMMAND = "exit";//結束程序的消息private MyJedisPubSub jedisPubSub = new MyJedisPubSub();public void consumerMessage(){Jedis jedis = RedisUtil.getJedis();jedis.psubscribe(jedisPubSub, CHANNEL_KEY);}@Overridepublic void run() {while(true){consumerMessage();}}public static void main(String[] args) {MessageConsumer mc = new MessageConsumer();Thread t1 = new Thread(mc,"thread5");Thread t2 = new Thread(mc, "thread6");t1.start();t2.start();}class MyJedisPubSub extends JedisPubSub{@Overridepublic void onMessage(String channel, String message) {// TODO Auto-generated method stub}@Overridepublic void onPMessage(String pattern, String channel, String message) {System.out.println(Thread.currentThread().getName() + "-接受到的消息:pattern"+pattern+",channel:"+channel+",message:"+message);String[] s = message.split(",");for(String s1:s){System.out.println(s1);}if(MessageConsumer.EXIT_COMMAND.equals(message)){System.exit(0);}}@Overridepublic void onSubscribe(String channel, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onPUnsubscribe(String pattern, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onPSubscribe(String pattern, int subscribedChannels) {// TODO Auto-generated method stub}}補充:訂閱的時候subscribe()和psubscribe()的第二個參數支持可變參數,也就是可以實現訂閱多個頻道。
總結
以上是生活随笔為你收集整理的Java实现redis消息队列发布/订阅模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Vue组件之间数据通信12种方式
- 下一篇: 快速提升pv秘籍,如何提升网站pv