JMS学习八(ActiveMQ消息持久化)
JMS學習八(ActiveMQ消息持久化)
ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,還有一種內存存儲的方式,由于內存不屬于持久化范疇,而且如果使用內存隊列,可以考慮使用更合適的產品,如ZeroMQ。所以內存存儲不在討論范圍內。
無論使用哪種持久化方式,消息的存儲邏輯都是一致的。
消息分為Queue和Topic兩種,Queue是點對點消費,發送者發送一條消息,只有一個且唯一的一個消費者能對其進行消費。
Topic是訂閱式消費,一個消息可以被很多的訂閱者消費,其中定閱者又分為持久化訂閱和非持久化訂閱。持久化訂閱是指即使訂閱者當前不在線,其訂閱之后,發送方發到Broker的消息,也會在持久化訂閱者再次上線的時候完成消費,不會丟失消息。而非持久化訂閱者,只有訂閱者在線時才會消費,不在線時,即使Broker收到新的消息,當其再次上線時,也不會收到錯過的消息。
ActiveMQ的持久化機制,對于Queue類型的消息,將存儲在Broker,但是一旦其中一個消費者完成消費,則立即刪除這條消息。對于Topic類型的消息,即使所有的訂閱者都完成了消費,Broker也不一定會馬上刪除無用消息,而是保留推送歷史,之后會異步清除無用消息。而每個訂閱者消費到了哪條消息的offset會記錄在Broker,以免下次重復消費。因為消息是順序消費,先進先出,所以只需要記錄上次消息消費到哪里就可以了。
配置持久化的方式,都是修改%ACTIVEMQ_HOME%conf/acticvemq.xml文件。
下面分別介紹幾種持久化方式的特點:
JDBC:很多企業級應用比較喜歡這種存儲方式。優點是大多數企業都有專門的DBA,以數據庫作為存儲介質,會讓有這方面人才的公司比較放心。另外,數據庫的存儲方式,可以看到消息是如何存儲的,可以通過SQL查詢消息消費狀態,可以查看消息內容,這是其他持久化方式所不具備的。還有一個優點就是數據庫可以支持強一致性事務,支持兩階段提交的分布式事務。缺點是性能問題,數據庫持久化是性能最低的一種方式。
之所以最先介紹數據庫的持久化方式,是因為我們可以通過表結構很好的理解ActiveMQ是怎么存儲和消費消息的。
數據庫會創建3個表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用于存儲消息,Queue和Topic都存儲在這個表中。
下面介紹一下主要的數據庫字段:
ID:自增的數據庫主鍵
CONTAINER:消息的Destination
MSGID_PROD:消息發送者客戶端的主鍵
MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
EXPIRATION:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
MSG:消息本體的Java序列化對象的二進制數據
PRIORITY:優先級,從0-9,數值越大優先級越高
activemq_acks用于存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存。
?
主要的數據庫字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,這個字段會有集群其他系統的信息
CLIENT_ID:每個訂閱者都必須有一個唯一的客戶端ID用以區分
SUB_NAME:訂閱者名稱
SELECTOR:選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性AND和OR操作
LAST_ACKED_ID:記錄消費過的消息的ID。
表activemq_lock在集群環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用于記錄哪個Broker是當前的Master Broker。
配置如下:
<beans> <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>首先定義一個mysql-ds的MySQL數據源,然后在persistenceAdapter節點中配置jdbcPersistenceAdapter并且引用剛才定義的數據源。
AMQ:性能高于JDBC,寫入消息時,會將消息寫入日志文件,由于是順序追加寫,性能很高。為了提升性能,創建消息主鍵索引,并且提供緩存機制,進一步提升性能。每個日志文件的大小都是有限制的(默認32m,可自行配置)。當超過這個大小,系統會重新建立一個文件。當所有的消息都消費完成,系統會刪除這個文件或者歸檔(取決于配置)。主要的缺點是AMQ Message會為每一個Destination創建一個索引,如果使用了大量的Queue,索引文件的大小會占用很多磁盤空間。而且由于索引巨大,一旦Broker崩潰,重建索引的速度會非常慢。
配置片段如下:
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/> </persistenceAdapter>雖然AMQ性能略高于Kaha DB,但是由于其重建索引時間過長,而且索引文件占用磁盤空間過大,所以已經不推薦使用。這里就不在詳細介紹AMQ持久化的數據結構。在新版本的ActiveMQ中,AMQ已經被刪除。
KahaDB:從ActiveMQ 5.4開始默認的持久化插件,KahaDb恢復時間遠遠小于其前身AMQ并且使用更少的數據文件,所以可以完全代替AMQ。kahaDB的持久化機制和AMQ非常像。同樣是基于日志文件,索引和緩存。和AMQ不同,KahaDB所有的Destination都使用一個索引文件。《ActiveMQ In Action》表示其可以支持10000個連接,每個連接都是一個獨立的Queue,足以滿足大部分應用場景。
?
Data logs用于存儲消息日志,消息的全部內容都在Data logs中。同AMQ一樣,一個Data logs文件大小超過規定的最大值,會新建一個文件。同樣是文件尾部追加,寫入性能很快。每個消息在Data logs中有計數引用,所以當一個文件里所有的消息都不需要了,系統會自動刪除文件或放入歸檔文件夾。
緩存用于存放在線消費者的消息。如果消費者已經快速的消費完成,那么這些消息就不需要再寫入磁盤了。
Btree索引會根據MessageID創建索引,用于快速的查找消息。這個索引同樣維護持久化訂閱者與Destination的關系,以及每個消費者消費消息的指針。
Redo log用于系統崩潰后,重建Btree索引。因為Redo log的存在,使得重建索引時不需要讀取Data logs的全量數據,大大提升性能。
KahaDB的目錄結構:
?
db log文件,以db-<Number>.log命名。archive目錄用于存檔歸檔的數據。db.data和db.redo分別是Btree索引和redo log。
由于是ActiveMQ的默認持久化機制,所以不需要修改配置文件就可以使用KahaDB,但是還是貼出配置片段:
<persistenceAdapter> <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/> </persistenceAdapter>?
LevelDB:從ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。LevelDB持久化性能高于KahaDB,雖然目前默認的持久化方式仍然是KahaDB,但是LevelDB是將來的趨勢。并且,在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的數據復制方式,用于Master-slave方式的首選數據復制方案。LevelDB使用自定義的索引代替常用的BTree索引。
?
通過上圖可以看出LevelDB主要由6部分組成:內存中的MemTable和ImmutableMemTable,還有硬盤上的log文件,manifest文件,current文件和SSTable文件。還有一些其他的輔助文件,暫時不做說明。
每寫入一次數據,需要寫入log文件,和MemTable,也就是說,只需要一次硬盤的順序寫入,和一個內存寫入,如果系統崩潰,可以通過log文件恢復數據。每次寫入會先寫log文件,后寫MemTable來保證不丟失數據。
當MemTable到達內存閥值,LevelDB會創建一個新的MemTable和log文件,而舊的MemTable會變成ImmutableMemTable,ImmutableMemTable的內容是只讀的。然后系統會定時的異步的把ImmutableMemTable的數據寫入新的SSTable文件。
SSTable文件和MemTable,ImmutableMemTable的數據結構相同,都是key,value的數據,按照key排序。
manifest文件用于記錄每個SSTable的key的起始值和結束值,有點類似于B-tree索引。而manifest同樣會生成新文件,舊的文件不再使用。current文件就是指定哪個manifest文件是現在正在使用的。
更具體實現原理可參見:http://www.cnblogs.com/haippy/archive/2011/12/04/2276064.html
配置片段如下:
<persistenceAdapter> <levelDB directory="${activemq.data}/activemq-data"/> </persistenceAdapter>在目前的ActiveMQ 5.10版本中,直接使用LevelDB會導致服務不能啟動,拋出java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
原因是有兩個Guava cache導致版本沖突,解決的辦法是:
1.刪除%ACTIVEMQ_HOME%lib下面的pax-url-aether-1.5.2.jar
2.注釋掉%ACTIVEMQ_HOME%conf/activemq.xml的下面幾行:
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean>這個BUG地址是https://issues.apache.org/jira/browse/AMQ-5225,希望可以在下個版本順利解決。
?
下面是跑在我機器上的性能測試,實際數據意義不大,因為每個環境的配置都不同,但是可以通過對比看出幾種持久化方式的性能對比。
| ? | 發送1000條消息(毫秒) | 發送10000條消息(毫秒) | 消費1000條消息的時間(毫秒) | 消費10000條消息的時間(毫秒) |
| JDBC-Mysql | 43009 | 369802 | 610 | 509338 |
| KahaDB | 34227 | 360493 | 208 | 2224 |
| LevelDB | 34032 | 347712 | 220 | 2877 |
通過這個表格可以看出來,發送消息LevelDB最快,KahaDB稍微慢點,JDBC最慢,但是也不會慢太多,是一個數量級。消費消息,KahaDB最快,LevelDB稍微慢點,JDBC慢的讓人不能忍受,差好幾個數量級。LevelDB并沒有顯現出比KahaDB更多速度上的優勢。但是由于LevelDB支持高可用的復制數據,所以首選肯定還是LevelDB。
?
上面對幾種持久化方案講解的很詳細下面在看看著另一種
JDBC Message Store with ActiveMQ Journal
1、這種方式客服了jdbc store 的不足,使用快速的緩存寫入技術,大大提高了性能,具體配置如下:
<persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="2" journalLogFileSize="16" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="${activemq.data}/data"/> </persistenceFactory>其他的和jdbc store 是一樣的。
?
優點: 比jdbdc store 寫入速度快
缺點:不用用于master/slave 模式
總結
以上是生活随笔為你收集整理的JMS学习八(ActiveMQ消息持久化)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UNIX再学习 -- 发送信号
- 下一篇: UNIX再学习 -- 信号处理