ActiveMQ_3Java实现
Java實現
?
添加相應的jar包
<dependency>
? <groupId>org.apache.activemq</groupId>
? <artifactId>activemq-all</artifactId>
? <version>x.xx.x</version>
</dependency>
?
創建生產者類(點對點)
public class ProducerTest {
??? // 異步發送asyn
??? // 死信隊列 DLQ
??? // 文件上傳
??? // header properties使用
??? // jdbc存儲
??? // byteMsg objMsg inputMsg
??? // mq ptp 的使用場景
??? // mq中所有的隊列名以及每個隊列中未被消費的消息數量
??? // mq sub/pub 的使用場景
??? @Test
??? public void testQueueProducer() throws JMSException{
?????? // 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。
?????? //brokerURL服務器的ip及端口號
?????? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.114.129:61616");
?????? // 第二步:使用ConnectionFactory對象創建一個Connection對象
?????? Connection connection = connectionFactory.createConnection();
?????? // 第三步:開啟連接,調用Connection對象的start方法
?????? connection.start();
?????? // 第四步:使用Connection對象創建一個Session對象。
?????? //第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。
?????? //第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。一般是自動應答
?????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
?????? // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。
?????? //參數:隊列的名稱。
?????? Queue queue = session.createQueue("test-queue");
?????? // 第六步:使用Session對象創建一個Producer對象。
?????? MessageProducer producer = session.createProducer(queue);
?????? producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
?????? // 第七步:創建一個Message對象,創建一個TextMessage對象。
?????? /*TextMessage message = new ActiveMQTextMessage();
?????? message.setText("hello activeMq,this is my first test.");*/
?????? TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
?????? producer.send(textMessage);
?????? producer.close();
?????? session.close();
?????? connection.close();
??? }
}
?
創建消費者類(點對點)
public class ConsumerTest {
??? @Test
??? public void testQueueConsumer() throws JMSException, IOException{
?????? // 第一步:創建一個ConnectionFactory對象。
??????? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.114.129:61616");
??????? // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
??????? Connection connection = connectionFactory.createConnection();
??????? // 第三步:開啟連接。調用Connection對象的start方法。
????? ??connection.start();
??????? // 第四步:使用Connection對象創建一個Session對象。
??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,并且隊列的名稱一致。
??????? Queue queue = session.createQueue("queue.stu");
??????? // 第六步:使用Session對象創建一個Consumer對象。
??????? MessageConsumer consumer = session.createConsumer(queue);
??????? // 第七步:接收消息。
??????? consumer.setMessageListener(new MessageListener()
???????????? @Override
???????????? public void onMessage(Message message) {
????????????????? try {
??????????????? ? ??//if (TextMessage.class.isAssignableFrom(message.getClass()))
??????????????? ? ??if (message instanceof TextMessage) {
??? ????????????????????? TextMessage textMessage = (TextMessage) message;
??? ????????????????????? String text = null;
??? ????????????????????? //取消息的內容
??? ????????????????????? text = textMessage.getText();
??? ????????????????????? // 第八步:打印消息。
??? ????????????????????? System.out.println(text);
??????????????? ? ??}
????????????????? } catch (JMSException e) {
????????????????????? e.printStackTrace();
????????????????? }
???????????? }
??????? });
??????? //等待鍵盤輸入
??????? System.in.read();
??????? // 第九步:關閉資源
??????? consumer.close();
??????? session.close();
?????? ?connection.close();
??? }
}
?
運行active服務器驗證實現情況
?
發布(對點對點代碼修改 創建Topic Destination)
// 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個topic對象。
// 參數:話題的名稱。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象創建一個Producer對象。
MessageProducer producer = session.createProducer(topic);
?
訂閱(對點對點代碼修改 創建Topic Destination)
// 第五步:使用Session對象創建一個Destination對象。和發送端保持一致topic,并且話題的名稱一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象創建一個Consumer對象。
MessageConsumer consumer = session.createConsumer(topic);
?
轉載于:https://www.cnblogs.com/zhiboluo/p/10114647.html
總結
以上是生活随笔為你收集整理的ActiveMQ_3Java实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python 中的 lstrip、rst
- 下一篇: Qt窗口部件与布局之二:布局管理