activemq java 异步_异步消息处理机制之activeMQ应用实例
上篇說了KAFKA應用實例,本篇承接上篇,著重描述activeMQ消息機制的應用。
KAFKA和MQ同為數據異步處理中間件,本質都是對消息的異步處理,異步通信、削谷填峰,高并發情況下的數據處理機制。他們的不同之處在于處理數據量的大小。
MQ和KAFKA相比較,KAFKA處理的數據量更大.
下圖為activeMQ應用目錄:
ActiveMQ客戶端,對連接和會話的管理/**
*
*/
package com.ustcinfo.kanms.alarmcollector.activemq;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;
/**
* =================================================
* 工程:GessAlarmCollector
* 類名:ActiveMQClient
* 作者:dlzhang
* 時間:2014-8-28下午05:37:21
* 版本:Version 1.0
* 描述:ActiveMQ客戶端,對連接和會話的管理,保證全局只有一個會話被創建,減少服務端壓力、節省資源
* =================================================
*/
public class ActiveMQClient{
private static final Logger logger = Logger.getLogger(ActiveMQClient.class);
private String url;
private String user;
private String passwd;
private ActiveMQConnectionFactory connFactory;
private Connection conn;
private Session session;
private boolean isConn;
public ActiveMQClient(){
// 初始化參數
this.url = GlobleConfiguration.getInstance().getActiveMqUrl();
this.user = GlobleConfiguration.getInstance().getActiveMqUser();
this.passwd = GlobleConfiguration.getInstance().getActiveMqPasswd();
}
/**
* 建立連接
*/
protected synchronized void buildConnect() {
if(isConn)
return;
try {
logger.debug("建立連接,user=" + user + ", passwd=" + passwd + ", url=" + url);
connFactory = new ActiveMQConnectionFactory(user, passwd, url);
conn = connFactory.createConnection();
conn.start();
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
isConn = true;
logger.info("建立連接成功");
} catch (JMSException e) {
logger.error("建立連接失敗:" + e.getMessage(), e);
isConn = false;
}
}
/**
* 關閉連接
*/
public synchronized void close() {
try {
if(null != session)
session.close();
if(null != conn)
conn.close();
} catch (JMSException e) {
logger.error("關閉連接失敗:" + e.getMessage(), e);
} finally {
session = null;
conn = null;
connFactory = null;
isConn = false;
}
}
/**
* @return the url
*/
public String getUrl() {
return url;
}
/**
* @param url the url to set
*/
public void setUrl(String url) {
this.url = url;
}
/**
* @return the user
*/
public String getUser() {
return user;
}
/**
* @param user the user to set
*/
public void setUser(String user) {
this.user = user;
}
/**
* @return the passwd
*/
public String getPasswd() {
return passwd;
}
/**
* @param passwd the passwd to set
*/
public void setPasswd(String passwd) {
this.passwd = passwd;
}
/**
* @return the connFactory
*/
public ActiveMQConnectionFactory getConnFactory() {
if(!isConn)
buildConnect();
if(null == connFactory && isConn) {
this.close();
this.buildConnect();
}
return connFactory;
}
/**
* @param connFactory the connFactory to set
*/
public void setConnFactory(ActiveMQConnectionFactory connFactory) {
this.connFactory = connFactory;
}
/**
* @return the conn
*/
public Connection getConn() {
if(!isConn)
buildConnect();
if(null == conn && isConn) {
this.close();
this.buildConnect();
}
return conn;
}
/**
* @param conn the conn to set
*/
public void setConn(Connection conn) {
this.conn = conn;
}
/**
* @return the session
*/
public Session getSession() {
if(!isConn)
buildConnect();
if(null == session && isConn) {
this.close();
this.buildConnect();
}
return session;
}
/**
* @param session the session to set
*/
public void setSession(Session session) {
this.session = session;
}
/**
* @return the isOpen
*/
public boolean isConn() {
return isConn;
}
/**
* @param isOpen the isOpen to set
*/
public void setConn(boolean isConn) {
this.isConn = isConn;
}
}activemq消息主體的定義和聲明/**
*
*/
package com.ustcinfo.kanms.alarmcollector.activemq;
import java.io.Serializable;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;
/**
* =================================================
* 工程:GessAlarmCollector
* 類名:ActiveMQSender
* 作者:dlzhang
* 時間:2014-8-29下午01:02:40
* 版本:Version 1.0
* 描述:描述該文件的作用
* =================================================
*/
public class ActiveMQSender {
private static final Logger logger = Logger.getLogger(ActiveMQSender.class);
private ActiveMQClient mqClient;
private Session session;
private String sendQueueName;
private Destination dest;
private MessageProducer producer;
public ActiveMQSender() {
// 初始化參數
this.mqClient = new ActiveMQClient();
this.session = this.mqClient.getSession();
this.sendQueueName = GlobleConfiguration.getInstance().getActiveMqSendQueueName();
try {
dest = session.createQueue(sendQueueName);
producer = session.createProducer(dest);
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
// /**
// * 測試函數
// * @param args
// */
// public static void main(String[] args) {
// // 初始化系統配置文件
// GlobleConfiguration.getInstance().initSysConfig();
//
// ActiveMQSender sender = new ActiveMQSender();
for(int i=0; i<100; i++) {
sender.sendTextMessage("這是一個測試");
// sender.sendTextMessage("quit");
// logger.debug("第" + ++num + "條消息發送成功");
}
// }
// private static long num;
/**
* @param msg
*/
public void sendMessage(Message msg) {
try {
producer.send(msg);
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
/**
* @param text
*/
public void sendTextMessage(String text) {
TextMessage tMsg = null;
try {
tMsg = session.createTextMessage(text);
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
this.sendMessage(tMsg);
}
/**
* @param bytes
*/
public void sendbytesMessage(byte[] bytes) {
BytesMessage bMsg = null;
try {
bMsg = session.createBytesMessage();
bMsg.writeBytes(bytes);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.sendMessage(bMsg);
}
/**
* @param s
*/
public void sendObjectMessage(Serializable s) {
ObjectMessage oMsg = null;
try {
oMsg = session.createObjectMessage(s);
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
this.sendMessage(oMsg);
}
/**
* @return the mqClient
*/
public ActiveMQClient getMqClient() {
return mqClient;
}
/**
* @param mqClient the mqClient to set
*/
public void setMqClient(ActiveMQClient mqClient) {
this.mqClient = mqClient;
}
/**
* @return the session
*/
public Session getSession() {
return session;
}
/**
* @param session the session to set
*/
public void setSession(Session session) {
this.session = session;
}
/**
* @return the sendQueueName
*/
public String getSendQueueName() {
return sendQueueName;
}
/**
* @param sendQueueName the sendQueueName to set
*/
public void setSendQueueName(String sendQueueName) {
this.sendQueueName = sendQueueName;
}
/**
* @return the dest
*/
public Destination getDest() {
return dest;
}
/**
* @param dest the dest to set
*/
public void setDest(Destination dest) {
this.dest = dest;
}
/**
* @return the producer
*/
public MessageProducer getProducer() {
return producer;
}
/**
* @param producer the producer to set
*/
public void setProducer(MessageProducer producer) {
this.producer = producer;
}
}消息接受器,使用監聽的方式接受消息/**
*
*/
package com.ustcinfo.kanms.alarmcollector.activemq;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import com.ustcinfo.kanms.alarmcollector.framework.BaseThread;
import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmContainer;
import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;
import com.ustcinfo.kanms.alarmcollector.kafka.ProducerHandler;
import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;
import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;
import com.ustcinfo.kanms.alarmcollector.util.FileWriteThread;
/**
* =================================================
* 工程:AlarmAutoReceiver
* 類名:ActiveMQReceiver
* 作者:lt
* 時間:2019-9-23下午08:49:38
* 版本:Version 1.0
* 描述:告警消息接受器,使用監聽的方式接受消息
* 實現ExceptionListener接口,對服務器進行監聽,實現自動重連功能
* =================================================
*/
public class AlarmAutoReceiverThread{
private static final Logger logger = Logger.getLogger(AlarmAutoReceiverThread.class);
private final FileWriteThread filewrite = new FileWriteThread();//建立一個新的線程寫文件
public AlarmAutoReceiverThread(int n) {
for(int i=0;i
new AlarmAutoReceiver().start();
logger.info(">>>>>成功啟動監聽線程:線程"+i);
}
}
class AlarmAutoReceiver extends BaseThread implements MessageListener, ExceptionListener {
private final Logger log = Logger.getLogger(AlarmAutoReceiver.class);
private ActiveMQClient mqClient;
private GesAlarmContainer gesAlarmContainer;
private Session session;
private String recvQueueName;
private Destination dest;
private MessageConsumer consumer;
public AlarmAutoReceiver() {
// 初始化參數
this.mqClient = new ActiveMQClient();
this.gesAlarmContainer = GesAlarmContainer.getInstance();
try {
mqClient.getConn().setExceptionListener(this); // 設置監聽
} catch (JMSException e) {
log.error(e.getMessage(), e);
}
this.session = this.mqClient.getSession();
this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName();
}
public AlarmAutoReceiver(ActiveMQClient mqClient) {
// 初始化參數
this.mqClient = mqClient;
this.gesAlarmContainer = GesAlarmContainer.getInstance();
try {
mqClient.getConn().setExceptionListener(this); // 設置監聽
} catch (JMSException e) {
log.error(e.getMessage(), e);
}
this.session = this.mqClient.getSession();
this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName();
}
/* (non-Javadoc)
* @see java.lang.Thread#run()
*/
@Override
public void run() {
log.debug("啟動線程, Thread.currentThread().getName()=" + Thread.currentThread().getName());
try {
dest = session.createQueue(recvQueueName);
consumer = session.createConsumer(dest);
consumer.setMessageListener(this);
// 阻塞線程,直到收到消息為“quit”時被喚醒
synchronized (this) {
wait();
}
log.debug("結束線程, Thread.currentThread().getName()=" + Thread.currentThread().getName());
mqClient.close();
} catch (JMSException e) {
log.error("創建消費者失敗:" + e.getMessage(), e);
} catch (InterruptedException e) {
log.error("線程被中斷", e);
}
}
/* (non-Javadoc)
* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
*/
@Override
public void onException(JMSException e) {
// 捕獲異常,連接重連
log.error("服務端異常", e);
// 結束線程
if(this.isAlive()) {
synchronized (this) {
this.notifyAll();
}
}
// 每隔一分鐘循環獲取,直到獲取連接成功
long sleepTimeSec = 10;
long sleepTimeMillis = sleepTimeSec * 1000;
int reConnCnt = 0;
while (true) {
try {
reConnCnt++;
log.info("休眠" + sleepTimeSec + "秒 -_-~zZ");
Thread.sleep(sleepTimeMillis);
log.debug("開始重新獲取連接");
// 先關閉連接,再重新連接
mqClient.close();
mqClient.buildConnect();
if(mqClient.isConn()) {
log.info("重新獲取連接成功,耗時:[" + reConnCnt * sleepTimeSec + "]秒 ^_^");
// 重新創建監聽
new AlarmAutoReceiver(mqClient).start();
break;
}
log.error("第" + reConnCnt + "次重新獲取連接失敗 T_T");
} catch (InterruptedException e1) {
log.error(e1.getMessage(), e1);
}
}
}
// /**
// * 測試函數
// * @param args
// */
// public static void main(String[] args) {
// // 初始化系統配置文件
// GlobleConfiguration.getInstance().initSysConfig();
//
// ActiveMQReceiver recv = new ActiveMQReceiver();
// recv.start();
// logger.debug("消費者啟動成功");
// }
/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
log.info("-------------- 收到一條告警信息 -----------");
if (message instanceof TextMessage){
TextMessage tm = (TextMessage)message;
try {
String xml = tm.getText();
if(xml!=null){
try{
filewrite.handle(xml.substring(xml.indexOf(""),xml.lastIndexOf("")+9)+"\n--"+dateFormat(new Date()));
}catch(Exception e){
log.error("告警日志寫入出錯!",e);
filewrite.handle(xml+"\n--"+dateFormat(new Date()));
}
}
// 當消息為“quit”時,喚醒線程
if(xml != null && xml.equalsIgnoreCase("quit")) {
logger.info(Thread.currentThread().getName() + "接收到的消息為:" + xml + ",開始退出線程");
synchronized (this) {
notifyAll();
}
return;
}
/*String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim());
List gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr);
if(gesAlarmList!=null&&gesAlarmList.size()>0){
logger.debug("gesAlarmList.size()=" + gesAlarmList.size());
for(int i=0;i
GesAlarm gesAlarm = new GesAlarm();
gesAlarm = gesAlarmList.get(i);
gesAlarmContainer.putGesAlarm(gesAlarm);
}
}*/
//向kafka寫入數據
new ProducerHandler("znwgAlarm",xml);
} catch (JMSException e) {
} catch(Exception e){
logger.error("接受告警信息出錯!", e);
}finally {
}
}
}
private String dateFormat(Date date){
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date);
}
// private long num;
/**
* @return the mqClient
*/
public ActiveMQClient getMqClient() {
return mqClient;
}
/**
* @param mqClient the mqClient to set
*/
public void setMqClient(ActiveMQClient mqClient) {
this.mqClient = mqClient;
}
/**
* @return the session
*/
public Session getSession() {
return session;
}
/**
* @param session the session to set
*/
public void setSession(Session session) {
this.session = session;
}
/**
* @return the recvQueueName
*/
public String getRecvQueueName() {
return recvQueueName;
}
/**
* @param recvQueueName the recvQueueName to set
*/
public void setRecvQueueName(String recvQueueName) {
this.recvQueueName = recvQueueName;
}
/**
* @return the dest
*/
public Destination getDest() {
return dest;
}
/**
* @param dest the dest to set
*/
public void setDest(Destination dest) {
this.dest = dest;
}
/**
* @return the consumer
*/
public MessageConsumer getConsumer() {
return consumer;
}
/**
* @param consumer the consumer to set
*/
public void setConsumer(MessageConsumer consumer) {
this.consumer = consumer;
}
}
}
總結
以上是生活随笔為你收集整理的activemq java 异步_异步消息处理机制之activeMQ应用实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 全民进化怎么退化 退化方法新手详解指南
- 下一篇: 原始传奇铭文功能怎么样