activeMQ 本地测试
生活随笔
收集整理的這篇文章主要介紹了
activeMQ 本地测试
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
參考博主 搭建~?https://www.cnblogs.com/jaycekon/p/6225058.html
ActiveMQ官網下載地址:http://activemq.apache.org/download.html
我下的是windows版本的
下載解壓之后進入D:\config\apache-activemq-5.15.7\bin\win64
雙擊運行activemq.bat,啟動本地MQ服務,
?
?
?started說明啟動成功。
接下來是代碼部分:
生產者:Producer
package com.mqtest;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger;/*** @author Maggie.Hao* @date 2018/11/5 14:31*/ public class Producer{private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);//ActiveMq 的默認用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//ActiveMq 的默認登錄密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//ActiveMQ 的鏈接地址private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;AtomicInteger count = new AtomicInteger(0);//鏈接工廠 ConnectionFactory connectionFactory;//鏈接對象 Connection connection;//事務管理 Session session;ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();public void init(){LOGGER.info("Product init");try{//創建一個鏈接工廠// connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616");connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);//從工廠中創建一個鏈接connection = connectionFactory.createConnection();//開啟鏈接 connection.start();//創建一個事務(這里通過參數可以設置事務的級別)session = connection.createSession(true, Session.SESSION_TRANSACTED);}catch (JMSException e){LOGGER.error("", e);}}public void sendMessage(String disname){try{//創建一個消息隊列Queue queue = session.createQueue(disname);//消息生產者MessageProducer messageProducer = null;if (threadLocal.get() != null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while (true){Thread.sleep(1000);int num = count.getAndIncrement();//創建一條消息 TextMessage msg;msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我現在正在生產東西!,count:" + num);LOGGER.info("msg:{} + {}", msg, num);//發送消息 messageProducer.send(msg);//提交事務 session.commit();}}catch (JMSException e){LOGGER.error("", e);}catch (InterruptedException e){LOGGER.error("", e);}} }
消費者:Consumer
package com.mqtest;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger;/*** @author Maggie.Hao* @date 2018/11/5 14:34*/ public class Consumer{private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;ConnectionFactory connectionFactory;Connection connection;Session session;ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();AtomicInteger count = new AtomicInteger();public void init(){try{connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);}catch (JMSException e){LOGGER.error("", e);}}public void getMessage(String disname){try{Queue queue = session.createQueue(disname);MessageConsumer consumer = null;if (threadLocal.get() != null){consumer = threadLocal.get();}else{consumer = session.createConsumer(queue);threadLocal.set(consumer);}while (true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if (msg != null){msg.acknowledge();LOGGER.info("{}:Consumer:我是消費者,我正在消費Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement());}else{break;}}}catch (JMSException e){LOGGER.error("", e);}catch (InterruptedException e){LOGGER.error("", e);}} }
啟動生產者:
package com.mqtest;import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** @author Maggie.Hao* @date 2018/11/5 14:34*/ public class TestProducer{private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);public static void main(String[] args){Producer producer = new Producer();producer.init();TestProducer testMq = new TestProducer();try{Thread.sleep(1000);}catch (InterruptedException e){LOGGER.error("", e);}//Thread 1new Thread(testMq.new ProductorMq(producer)).start();//Thread 2new Thread(testMq.new ProductorMq(producer)).start();//Thread 3new Thread(testMq.new ProductorMq(producer)).start();//Thread 4new Thread(testMq.new ProductorMq(producer)).start();//Thread 5new Thread(testMq.new ProductorMq(producer)).start();}private class ProductorMq implements Runnable{Producer producter;public ProductorMq(Producer producter){this.producter = producter;}@Overridepublic void run(){while (true){try{producter.sendMessage("Jaycekon-MQ");Thread.sleep(10000);}catch (InterruptedException e){LOGGER.error("{}", e);}}}} }
啟動消費者:
package com.mqtest;import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** @author Maggie.Hao* @date 2018/11/5 15:39*/ public class TestConsumer{private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);public static void main(String[] args){Consumer comsumer = new Consumer();comsumer.init();TestConsumer testConsumer = new TestConsumer();new Thread(testConsumer.new ConsumerMq(comsumer)).start();new Thread(testConsumer.new ConsumerMq(comsumer)).start();new Thread(testConsumer.new ConsumerMq(comsumer)).start();new Thread(testConsumer.new ConsumerMq(comsumer)).start();}private class ConsumerMq implements Runnable{Consumer consumer;public ConsumerMq(Consumer consumer){this.consumer = consumer;}@Overridepublic void run(){while (true){try{consumer.getMessage("Jaycekon-MQ");Thread.sleep(10000);}catch (InterruptedException e){LOGGER.error("", e);}}}} }
控制臺輸出結果:
可以在? ?http://127.0.0.1:8161/admin/queues.jsp 查看結果
用戶名和密碼默認都為:admin?
點擊Queues可以看到我們的消息隊列信息
?
轉載于:https://www.cnblogs.com/mengjie1001/p/9916115.html
總結
以上是生活随笔為你收集整理的activeMQ 本地测试的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信个性签名关于爱情
- 下一篇: 厦门到鼓浪屿的船票多少钱?