使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
生活随笔
收集整理的這篇文章主要介紹了
使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
前言:
本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar
其中jedis連接池需要依賴commons-pool2包,json包用于對象實例和json字符串的相互轉(zhuǎn)換
1、jedis的消息隊列方法簡述
1.1、發(fā)布消息方法
(其中,channel是對應(yīng)消息通道,message是對應(yīng)消息體)
jedis.publish(channel, message);
1.2、監(jiān)聽消息方法
(其中,jedisPubSub用于處理監(jiān)聽到的消息,channels是對應(yīng)的通道)
jedis.subscribe(jedisPubSub, channels);
2、發(fā)布消息
/*** 從jedis連接池獲取jedis操作實例* @return*/public static Jedis getJedis() {return RedisPoolManager.getJedis();}/*** 推入消息到redis消息通道* * @param String* channel* @param String* message*/public static void publish(String channel, String message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}/*** 推入消息到redis消息通道* * @param byte[]* channel* @param byte[]* message*/public void publish(byte[] channel, byte[] message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}
3、監(jiān)聽消息
3.1、監(jiān)聽消息主體方法
/*** 監(jiān)聽消息通道* @param jedisPubSub - 監(jiān)聽任務(wù)* @param channels - 要監(jiān)聽的消息通道*/public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}/*** 監(jiān)聽消息通道* @param jedisPubSub - 監(jiān)聽任務(wù)* @param channels - 要監(jiān)聽的消息通道*/public static void subscribe(JedisPubSub jedisPubSub, String... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}
3.2、處理監(jiān)聽到的消息任務(wù)
class Tasker implements Runnable {private String[] channel = null;//監(jiān)聽的消息通道private JedisPubSub jedisPubSub = null;//消息處理任務(wù)public Tasker(JedisPubSub jedisPubSub, String ...channel) {this.jedisPubSub = jedisPubSub;this.channel = channel;}@Overridepublic void run() {// 監(jiān)聽channel通道的消息RedisMQ.subscribe(jedisPubSub, channel);}}
3.3、處理監(jiān)聽到的消息主體類實現(xiàn)
package cn.eguid.livePushServer.redisManager;import java.util.Map;import org.json.JSONObject;import cc.eguid.livepush.PushManager; import redis.clients.jedis.JedisPubSub;public class RedisMQHandler extends JedisPubSub{PushManager pushManager = null;public RedisMQHandler(PushManager pushManager) {super();this.pushManager = pushManager;}@Override// 接收到消息后進行分發(fā)執(zhí)行public void onMessage(String channel, String message) {JSONObject jsonObj = new JSONObject(message);System.out.println(channel+","+message);if ("push".equals(channel)) {Map<String,Object> map=jsonObj.toMap();System.out.println("接收到一條推流消息,準(zhǔn)備推流:"+map); // String appName=pushManager.push(map);//推流完成后還需要發(fā)布一個成功消息到返回隊列} else if ("close".equals(channel)) {String appName=jsonObj.getString("appName");System.out.println("接收到一條關(guān)閉消息,準(zhǔn)備關(guān)閉應(yīng)用:"+appName); // pushManager.closePush(appName);}} }4、測試消息隊列發(fā)布和監(jiān)聽
public static void main(String[] args) throws InterruptedException {PushManager pushManager= new PushManagerImpl();Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));t1.start();t2.start();LivePushEntity livePushInfo=new LivePushEntity();livePushInfo.setAppName("test1");JSONObject json=new JSONObject(livePushInfo);publish("push",json.toString());publish("close", json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());}
轉(zhuǎn)載于:https://www.cnblogs.com/eguid/p/6821593.html
總結(jié)
以上是生活随笔為你收集整理的使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [迷宫中的算法实践]迷宫生成算法——Pr
- 下一篇: MyBatis关联查询,表字段相同,re