JMS学习三(ActiveMQ消息的可靠性)
下面我們來學習一下消息接受確認和發送持久化消息、消息的過期、消息的選擇器和消息的優先級。
一、消息接收確認
1、jms消息只有在被確認之后才認為成功消費了這條消息。消息的成功消費通常包括三個步驟:(1)、client接收消息 (2)、client處理消息 (3)、消息被確認(也就是client給一個確認消息)
不管是事務性會話還是非事務性會話,第一步和第二步都一樣但第三步有所不同
2、在事務性會話中當一個事務被提交的時候,確認自動發生,和應答模式沒關系,這個值可以隨便寫。(這里多提一句異步消息接收中不能使用事務性會話)
3、在非事務性會話中消息何時被確認取決于創建的session中設置的消息應答模式(acknowledge model)該參數有三個值:
(1)、Session.AUTO_ACKNOWLEDGE:當client端成功的從receive方法或從onMessage(Message message) 方法返回的時候,會話自動確認client收到消息。
(2)、Session.CLIENT_ACKNOWLEDGE: 客戶單通過調用acknowledge方法來確認客戶端收到消息。但需要注意在這種應答模式下,確認是在會話層上進行的,確認一個被消費的消息將自動確認所有已消費的其他消息。比如一個消費者已經消費了10條消息,然后確認了第5條消息被消費,則這10條都被確認消費了。
acknowledge()通知方法是在Message對象上
同步接收,調用acknowledge()方法進行確認:
1、生產者,設置簽收模式為Session.CLIENT_ACKNOWLEDGE // 通過Connection對象創建Session會話(上下文環境對象), // 參數一,表示是否開啟事務 // 參數二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收 Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); 2、消費者,設置簽收模式為Session.CLIENT_ACKNOWLEDGE // 通過Connection對象創建Session會話(上下文環境對象), // 參數一,表示是否開啟事務 // 參數二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收 Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE); 3、消費者,在消費消息的時候,返回一個確認 // 使用Session來創建消息對象的生產者或者消費者 MessageConsumer createConsumer = session.createConsumer(destination); while (true) { TextMessage textMessage = (TextMessage) createConsumer.receive();//同步方式接收 if (textMessage == null) break; // 客戶端的簽收模式, textMessage.acknowledge(); System.out.println("收到的內容為" + textMessage.getText()); }異步接受,調用acknowledge()方法進行確認:
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String value = textMessage.getText(); System.out.println("value: " + value); message.acknowledge(); //消息消費確認通知 } catch (JMSException e) { e.printStackTrace(); } } });?
?
(3)、Session.DUPS_ACKNOWLEDGE:不是必須簽收,消息可能會重復發送。在第二次重新傳送消息的時候,消息頭的JmsDelivered會被置為true標示當前消息已經傳送過一次,客戶端需要進行消息的重復處理控制。
?
小結:
1.transacted事務,事務成功commit,才會將消息發送到mom中?
2.acknowledgeMode消息確認機制?
? ? ?1)、帶事務的session?
? ? ? ? ? ? ? ?如果session帶有事務,并且事務成功提交,則消息被自動簽收。如果事務回滾,則消息會被再次傳送。?
? ? ?2)、不帶事務的session?
? ? ? ? ? ? ? ?不帶事務的session的簽收方式,取決于session的配置。?
? ? ? ? ? ? ? ? Activemq支持一下三種模式:?
? ? ? ? ? ? ? ? Session.AUTO_ACKNOWLEDGE ?消息自動簽收?
? ? ? ? ? ? ? ? Session.CLIENT_ACKNOWLEDGE ?客戶端調用acknowledge方法手動簽收?
? ? ? ? ? ? ? ? Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重復發送。在第二次重新傳送消息的時候,消息?
? ? ? ? ? ? ? ? 頭的JmsDelivered會被置為true標示當前消息已經傳送過一次,客戶端需要進行消息的重復處理控制。
?
二、發送持久化消息
這里發送持久化消息和消息的持久化是有點聯系,但不是一回事,發送持久化消息的意思是要不要將發送的消息持久化到磁盤或數據庫中而消息的持久化是指將消息持久化到磁盤還是數據庫,一個是要不要將消息持久化一個是將消息怎么持久化。
這里要寫的當然是要不要將消息持久化,ActiveMQ給我們提供了兩種兩個選擇即持久化或不持久化。
? ?ActiveMQ支持兩種消息的傳送模式,PERSISTENT和NON_PERSISTENT兩種。如果不指定傳輸模式,默認的就是持久化消息。如果容忍消息丟失,那么可以使用非持久化的消息機制以改善性能和減少開銷。
1、PERSISTENT:指示JMS Provider持久保存消息,會把消息持久化到磁盤以保證消息不會因為JMS provider、JMS Server 的失敗而丟失。
2、NON_PERSISTENT:不要求JMS Provider持久保存消息。如果JMS Server 重啟則消息就丟死了,消費端也就會失去這條消息。
3、消息的持久化設置:
MessageProducer producer = session.createProducer(queue); // 消息是否為持久性的,如果不設置默認是持久化的。 // producer.setDeliveryMode(DeliveryMode.PERSISTENT); //非持久化消息,消息是不會持久化到磁盤的,發送后如果服務關閉再次開啟則消息會丟失。 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);消息是否持久化對應用的性能影響還是很大的,而且在消息量很大的時候則更要慎重是否要對消息進行持久化。
?
三、消息過期
? ? ??
允許消息過期 。默認情況下,消息永不會過期。如果消息在特定周期內失去意義,那么可以設置過期時間。?
有兩種方法設置消息的過期時間,時間單位為毫秒:?
1.使用消息生產者的setTimeToLive 方法為所有的消息設置過期時間。
2.使用消息生產者的send 方法為每一條消息設置過期時間。
?消息過期時間,send 方法中的 timeToLive 值加上發送時刻的 GMT 時間值。如果 timeToLive 值等于零,則 JMSExpiration 被設為零, 表示該消息永不過期。
3、消息服務器接收到消息后,在指定的時間后,會從隊列中移除指定的消息,超時被移除的消息不會發送給消費者。
4、使用消息生產者的setTimeToLive(long time ) 方法來給所有的消息設置過期時間:
// 消息生產者 MessageProducer producer = null; producer = session.createProducer(queue); // 消息是否為持久性的,如果不設置默認是持久化的。 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息過期設置 producer.setTimeToLive(1000);?
5、使用消息生產者的send()方法來設置消息的過期時間
#在生產者發送消息的過程中,可以指定deliveryMode傳輸模式,priority消息優先級,timeToLive消息過期時間 void send(Message message, int deliveryMode, int priority, long timeToLive );上面設置消息過期的都是消息生產者這方的來設置的,也就是如果不滿足條件則消息服務器會把消息從消息隊列中刪除,但是我們也可以在消息消費端來設置接受時間(僅限于同步接受)
Message message = consumer.receive(2);就是在接受的時候添加等待時間(單位是毫秒)如果在指定的時間內獲取不到消息則不會再等了。如果不設置等待時間則一直等待直到接收到消息或超時為止。
?
四、優先級
?消息優先級從0-9十個級別,0-4是普通消息,5-9是加急消息。如果不指定優先級,默認為4.JMS不要求嚴格按照這十個優先級發送消息,但必須保證加急消息要優先于普通消息到達。
?
五、臨時目的地
? ? ? ? ?可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來創建臨時目的地。它們的存在時間只限于創建它們的連接所保持的時間。只有創建該臨時目的地的連接上的消息消費者才能夠從臨時目的地中提取消息。
?
六、持久訂閱
? ? ? ??首先消息生產者必須使用PERSISTENT提交消息。客戶可以通過會話上的createDurableSubscriber方法來創建一個持久訂閱,該方法的第一個參數必須是一個topic,第二個參數是訂閱的名稱。?JMS Provider會存儲發布到持久訂閱對應的topic上的消息。如果最初創建持久訂閱的客戶或者任何其它客戶使用相同的連接工廠和連接的客戶ID、相同的主題和相同的訂閱名再次調用會話上的createDurableSubscriber方法,那么該持久訂閱就會被激活。JMS Provider會象客戶發送客戶處于非激活狀態時所發布的消息。 持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在創建之后會一直保留,直到應用程序調用會話上的unsubscribe方法。
?
七、本地事務
? ? ? ? ?在一個JMS客戶端,可以使用本地事務來組合消息的發送和接收。JMS Session接口提供了commit和rollback方法。事務提交意味著生產的所有消息被發送,消費的所有消息被確認;事務回滾意味著生產的所有消息被銷毀,消費的所有消息被恢復并重新提交,除非它們已經過期。 事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被調用,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。 需要注意的是,如果使用請求/回復機制,即發送一個消息,同時希望在同一個事務中等待接收該消息的回復,那么程序將被掛起,因為知道事務提交,發送操作才會真正執行。 需要注意的還有一個,消息的生產和消費不能包含在同一個事務中。
?
八、
總結
以上是生活随笔為你收集整理的JMS学习三(ActiveMQ消息的可靠性)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何修改linux的MAC地址
- 下一篇: 基于TCP协议的通信模型