java连接imserver_java后端IM消息推送服务开发——协议
最近在一家saas企業使用Mqtt開發IM消息推送服務,把開發中的一些問題記錄下來,項目仍在商用中,完整的消息服務包括4個模塊---協議protocol,信令Signal,規則Rule,狀態Status,這個主題主要是協議protocol部分。
主要技術涉及到MongoDB,webservice,httpclient,Mqtt等
protocol分為四個模塊類來實現,當然這是為了以后的擴展性比較好
首先看一下我們的主類,主要是mqtt基礎方法的一個框架
public class MqttProtocol
{
private static Logger logger = Logger.getLogger(MqttProtocol.class);
public static final String HOST = "tcp://xx.xx.xx.xx:1883";
private static final String CLIENTID = "yyyy";
private MqttClient client;
private MqttConnectOptions options = new MqttConnectOptions();
//private String userName = "admin";
//private String passWord = "public";
public MqttMessage message;
private PushCallback callback;
/**
* 用于初始化mqttclient客戶端,設置回調函數,同時連接mqtt服務器
* @throws MqttException
*/
public MqttProtocol() throws MqttException
{
//MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
callback = new PushCallback();
client.setCallback(callback);
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(60);
connect();
}
/**
* 連接mqtt消息服務器,同時設置了斷開重連的功能,主要是為了高可用性考慮,在斷網服務器崩潰時候我們的程序仍然不會終止
*/
private void connect()
{
SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
System.out.println(sdf.format(System.currentTimeMillis()));
boolean tryConnecting = true;
while (tryConnecting) {
try {
client.connect(options);
} catch (Exception e1) {
System.out.println("Connection attempt failed with '"+e1.getCause()+
"'. Retrying.");
}
if (client.isConnected()) {
System.out.println("Connected.");
tryConnecting = false;
} else {
pause();
}
}
}
private void pause() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Error handling goes here...
}
}
/**
*
* @param topic
* @param qos
* @throws MqttPersistenceException
* @throws MqttException
* 訂閱相關主題
*/
public void subscribe(String topic , int qos) throws MqttPersistenceException,
MqttException
{
client.subscribe(topic, qos);
}
/**
*
* @throws MqttPersistenceException
* @throws MqttException
* 斷開連接服務器
*/
public void disconnect() throws MqttPersistenceException,
MqttException
{
client.disconnect();
}
/**
*
* @author binshi
*實現mqttcallback接口,主要用于接收消息后的處理方法
*/
private class PushCallback implements MqttCallback {
/**
* 斷開后 系統會自動調用這個函數,同時在這個函數里進行重連操作
*/
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
System.out.println("連接斷開,可以做重連");
connect();
try {
subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 消息成功傳送后,系統會自動調用此函數,表明成功向topic發送消息
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
System.out.println("deliveryComplete---------" + arg0.isComplete());
}
/**
* 連接mongo數據庫,返回關于具體collection的Mongocollection
* @param collectionname
* @return
*/
public void messageArrived(String topic, MqttMessage message) throws Exception
{
System.out.println(topic);
SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
System.out.println(sdf.format(System.currentTimeMillis()));
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內容 : " + new String(message.getPayload()));
//1 抽取事件信令消息
String messagejudge=new String(message.getPayload());
System.out.println("忽略所有robot消息以及offline離線消息");
JSONObject jo=new JSONObject();
try {
jo=JSONObject.fromObject(messagejudge);
} catch (Exception e) {
e.printStackTrace();
}
String from=jo.getString("from");
System.out.println("獲得from"+from);
System.out.println("確定消息是否包含offline,如果包含取得offline,為1就不處理");
String offline=null;
if(messagejudge.contains("offline"))
{
offline=jo.getString("offline");
}
if((offline==null)&&(!from.contains("robot")))
{
System.out.println("處理非系統消息和非離線消息");
String type=jo.getString("type");
System.out.println("獲得type"+type);
if(type.equals("shakehand"))
{
System.out.println("處理shakehand消息");
String admin="doyounkowwhy";
if(jo.toString().contains("admin"))
{
admin=jo.getString("admin");
}
System.out.println("取得admin 如果為1定義為客服,否則為普通用戶 admin為"+admin);
if(admin.equals("1"))
{
System.out.println("處理客服握手消息");
System.out.println("發送握手成功消息");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);
System.out.println("向客戶端發送離線未接收的消息");
String convid=jo.getString("convid");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);
}
else
{
System.out.println("處理普通用戶的握手消息");
String appid=jo.getString("appid");
String pageid=jo.getString("pageid");
String convid=jo.getString("convid");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);
}
}
else if(type.equals("text")||type.equals("image"))
{
System.out.println("處理圖片和文字消息");
String tmpindex=jo.getString("tmpindex");
String convid=jo.getString("convid");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.getTextMsg( tmpindex, from, convid, retopic);
System.out.println("保存圖片文字消息");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);
}
else if(type.equals("ack"))
{
System.out.println("處理ack消息");
String tmpindex=jo.getString("tmpindex");
String convid=jo.getString("convid");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);
}
}
}
}
/**
*
* @param args
* @throws MqttException
* 整個工程從這里開始執行,生成可執行jar包,這個設置為主類。
*/
public static void main(String[] args) throws MqttException
{
MqttProtocol signal = new MqttProtocol();
signal.message = new MqttMessage();
/**
server.message.setQos(2);
server.message.setRetained(false);
server.message.setPayload("給客戶端124推送的信息".getBytes());
server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);
*/
signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
System.out.println(signal.message.isRetained() + "------ratained狀態");
}
}
接下來使我們的遠程連接模塊,主要是通過給定的url調用遠程接口
public class RemoteOperation
{
private static Logger logger = Logger.getLogger(MqttProtocol.class);
public static JSONObject remoteCall(String url) throws HttpException, IOException
{
HttpClient httpClient = new HttpClient();
GetMethod method =null ;
method=new GetMethod(url);
int retcode = httpClient.executeMethod(method);
if (retcode != HttpStatus.SC_OK)
{// 發送不成功
logger.info("遠程調用出錯");
return null;
}
else
{
String body = method.getResponseBodyAsString();
logger.info(body+"遠程調用php成功");
JSONObject jsonObject=new JSONObject();
try {
jsonObject=JSONObject.fromObject(body);
} catch (Exception e) {
e.printStackTrace();
}
if (method != null)
{
method.releaseConnection();
}
return jsonObject;
}
}
}
下面是Mongo數據庫的相關操作的一個封裝,設計為單例模式,相當于每次都使用同一個client打開連接,類似于連接池的概念,當然業務邏輯部分可以更換
public class MongoDBDao
{
private static Logger logger = Logger.getLogger(MongoDBDao.class);
/**
* MongoClient的實例代表數據庫連接池,是線程安全的,可以被多線程共享,客戶端在多線程條件下僅維持一個實例即可
* Mongo是非線程安全的,目前mongodb API中已經建議用MongoClient替代Mongo
*/
private MongoClient mongoClient = null;
/**
*
* 私有的構造函數
* 作者:shibin
*/
private MongoDBDao(){
if(mongoClient == null){
String url = Constant.MONGO_MQTT_URL;
String user = Constant.MONGO_MQTT_USER;
String password = Constant.MONGO_MQTT_PASSWORD;
String database = Constant.MONGO_MQTT_DATABASE;
int port = 27017;
ServerAddress serverAddress = new ServerAddress(url, port);
ListserverAddresses = new ArrayList();
serverAddresses.add(serverAddress);
MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());
Listcredentials = new ArrayList();
credentials.add(credential);
mongoClient = new MongoClient(serverAddresses, credentials);
System.out.println(mongoClient);
System.out.println("初始化client完成");
}
}
/********單例模式聲明開始,采用餓漢式方式生成,保證線程安全********************/
//類初始化時,自行實例化,餓漢式單例模式
private static final MongoDBDao mongoDBDao = new MongoDBDao();
/**
*
* 方法名:getMongoDBDaoImplInstance
* 作者:shibin
*
* 描述:單例的靜態工廠方法
* @return
*/
public static MongoDBDao getMongoDBDaoInstance(){
return mongoDBDao;
}
public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException
{
System.out.println("獲得message的連接");
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
System.out.println("取得convid所對應的msg列表");
BasicDBObject query = new BasicDBObject();
query.put("_id", convid);
FindIterableiterable=null;
iterable = mongoCollection.find(query);
if(iterable.first()!=null)
{
System.out.println(iterable.first());
String res= iterable.first().toJson();
JSONObject jo=new JSONObject();
try {
jo=JSONObject.fromObject(res);
} catch (Exception e) {
e.printStackTrace();
}
JSONArray jsonArray=jo.getJSONArray("msg");
for(int i=0;i
doc.put("read", read);
Document tdoc = new Document();
tdoc.put("msg", doc);
UpdateOptions updateOptions=new UpdateOptions();
updateOptions.upsert(true);
mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);
iterable = mongoCollection.find(query);
System.out.println("更新message之后的值"+iterable.first());
}
public void getAck(String tmpindex,String convid,String from,String database,String collection)
{
System.out.println("接收到ack消息后更新message中的read字段");
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
BasicDBObject query = new BasicDBObject();
query.put("_id", convid);
query.put("msg.tmpindex", tmpindex);
BasicDBObject query1 = new BasicDBObject();
query1.put("_id", convid);
FindIterable iterable;
FindIterable iterable2;
iterable = mongoCollection.find(query1);
iterable2 = mongoCollection.find(query);
System.out.println("更新message滿足id過濾條件之前的值"+iterable.first());
System.out.println("更新message滿足id和tmpindex過濾條件之前的值"+iterable2.first());
if(iterable2.first()!=null)
{
Document doc = new Document();
doc.put("msg.$.read", from);
UpdateOptions updateOptions=new UpdateOptions();
updateOptions.upsert(true);
mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);
}
iterable = mongoCollection.find(query1);
System.out.println("更新messages之后的值"+iterable.first());
}
}
剩下的關于業務邏輯方面的就不多說了,主要是關于mqtt高可用性斷開重連的功能以及mongo相關的操作
總結
以上是生活随笔為你收集整理的java连接imserver_java后端IM消息推送服务开发——协议的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 全球最多人用浏览器Chrome官宣好消息
- 下一篇: 一粒相当一颗原子弹?B站申请“金坷垃”商