如何保证消息队列里的数据顺序执行?
使用MQ的時候,經(jīng)常會有按順序消費的需求,比如大數(shù)據(jù)團隊為了做數(shù)據(jù)分析,會把數(shù)據(jù)庫里數(shù)據(jù)同步到其他系統(tǒng)做一些數(shù)據(jù)統(tǒng)計分析。同步MySQL的時候,為了保證數(shù)據(jù)同步的實時性,會在中間加一個MQ,多個線程來消費MQ里的數(shù)據(jù)。
這種同步一般是讀取binlog數(shù)據(jù),你在MySQL里增改刪了數(shù)據(jù),對應(yīng)出來就是3條增改刪binlog日志發(fā)送到MQ里面,消費的時候肯定必須要按照增改刪的順序執(zhí)行。如果你換成刪除、修改、增加,就導(dǎo)致數(shù)據(jù)亂套了。
圖1?binlog同步
我們以kafka舉例,看下哪些環(huán)節(jié)會出現(xiàn)數(shù)據(jù)順序不一致情況,又怎么解決。
假設(shè)kafka分配了3個partition,kafka的一個特性就是,能保證寫入一個partition中的數(shù)據(jù)一定是有順序的。
生產(chǎn)者寫的時候,可以指定一個key,比如是訂單id作為key,這個id對應(yīng)的數(shù)據(jù)一定會寫到同一個partition中去,而且這個partition中的數(shù)據(jù)都是有順序的。
圖2?kafka partition
kafka的消費者開始消費partition中的數(shù)據(jù),一個消費消費一個partition,一個partition只能被一個消費者消費,不會出現(xiàn)一個消費者同時消費多個partition的情況。假如現(xiàn)在有3個partition,你啟動4個消費者,那么就會有一個消費者消費不到數(shù)據(jù)。
圖3 一個消費者消費一個partition
到目前為止,每個消費者消費到的數(shù)據(jù)都是有順序性的。但消費者內(nèi)部如果是單線程的,效率就會比較低,如果生產(chǎn)者寫入kafka的數(shù)據(jù)量比較大,消費不及時,就會出現(xiàn)消息堆積的情況,所以消費者需要多線程的方式運行。
假如消費者里啟動了3個線程,并發(fā)的來消費數(shù)據(jù),線程之間如果不做同步控制,還是會導(dǎo)致數(shù)據(jù)亂掉。
圖4 消費者多線程消費MQ
那如何保證kafka消費者多線程按順序消費數(shù)據(jù)呢?
多個線程不能直接拿數(shù)據(jù)去處理,此時我們可以在同步系統(tǒng)中搞多個內(nèi)存隊列,消費者拿到數(shù)據(jù)之后,根據(jù)每條數(shù)據(jù)的key做hash取模,把相同id的數(shù)據(jù)分配到同一個內(nèi)存隊列中去。
每個內(nèi)存隊列里的數(shù)據(jù)都是有順序性的,給每個內(nèi)存隊列都對應(yīng)一個線程,去消費內(nèi)存隊列中的數(shù)據(jù)。
假如有3條增改刪的數(shù)據(jù),都是對同一個id的處理,那么hash取模后就會寫入到同一個內(nèi)存隊列里去,由同一個線程去消費,然后按順序?qū)懭霐?shù)據(jù)庫中。
如果消費者按照單線程消費處理,一條數(shù)據(jù)耗費幾十毫秒,1秒鐘只能處理十幾條數(shù)據(jù),吞吐量就會非常低。如果開啟多線程的方式處理,就會幾倍的提高吞吐量,同時也保證了數(shù)據(jù)的順序性。
整個流程按這樣的設(shè)計方案來處理,就可以保證數(shù)據(jù)的順序性。
有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號
好文章,我在看??
總結(jié)
以上是生活随笔為你收集整理的如何保证消息队列里的数据顺序执行?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python管理系统web版_【程序源代
- 下一篇: hive 配置mysql_Hive的my