Kafka端到端审计
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-end-to-end-audit/
概述
Kafka端到端審計(jì)是指生產(chǎn)者生產(chǎn)的消息存入至broker,以及消費(fèi)者從broker中消費(fèi)消息這個(gè)過程之間消息個(gè)數(shù)及延遲的審計(jì),以此可以檢測(cè)是否有數(shù)據(jù)丟失,是否有數(shù)據(jù)重復(fù)以及端到端的延遲等。
目前主要調(diào)研了3個(gè)產(chǎn)品:
對(duì)于Kafka端到端的審計(jì)主要通過:
內(nèi)嵌timestamp的方式
主要是通過設(shè)置一個(gè)審計(jì)的時(shí)間間隔(這里稱之為time_bucket_interval,可以設(shè)置幾秒或者幾分鐘,這個(gè)可以自定義), 每個(gè)timestamp都會(huì)被分配到相應(yīng)的桶中,算法有:
這樣可以獲得相應(yīng)time_bucket的起始時(shí)間time_bucket_start,一個(gè)time_bucket的區(qū)間可以記錄為[time_bucket_start, time_bucket_start+time_bucket_interval]。
每發(fā)送或者消費(fèi)一條消息可以根據(jù)消息payload內(nèi)嵌的時(shí)間戳,分配到相應(yīng)桶中,然后對(duì)桶進(jìn)行計(jì)數(shù),之后進(jìn)行存儲(chǔ),簡(jiǎn)單的可以存儲(chǔ)到,比如:Map<long time_bucket_start, long count>之中。
內(nèi)嵌index的方式
這種方式就更容易理解了,對(duì)于每條消息都分配一個(gè)全局唯一的index,如果topic及相應(yīng)的partition固定的話,可以為每一個(gè)topic-partition設(shè)置一個(gè)全局的index,當(dāng)有消息發(fā)送到某個(gè)topic-partition中,那么首先獲取其topic-partition對(duì)應(yīng)的index, 然后內(nèi)嵌到payload中,之后再發(fā)送到broker。消費(fèi)者進(jìn)行消費(fèi)審計(jì),可以判斷出哪個(gè)消息丟失,哪個(gè)消息重復(fù)等等。如果要計(jì)算端到端延遲的話,還需要在payload中內(nèi)嵌timestamp以作相應(yīng)的計(jì)算。
下面來簡(jiǎn)要分析下三個(gè)產(chǎn)品。
Chaperone
github地址:https://github.com/uber/chaperone
官方介紹(中文):http://www.infoq.com/cn/news/2016/12/Uber-Chaperone-Kafka
官方介紹(英文):https://eng.uber.com/chaperone/
Chaperone進(jìn)行消息端到端的校驗(yàn)主要是基于message內(nèi)置timestamp實(shí)現(xiàn)的,根據(jù)timestamp將message分配到不同的bucket中。之后就是對(duì)這個(gè)bucket中的消息進(jìn)行計(jì)數(shù)等一系列的audit操作,然后將這個(gè)audit操作之后的信息auditMessage保存起來,auditMessage的內(nèi)容:
- topicName:被audit的topic
- time_bucket_start:bucket的起始時(shí)間
- time_bucket_end
- metrics_count:time_bucket中的個(gè)數(shù)
- metrics_mean_latency, metrics_p95_latency, metrics_p99_latency,metrics_max_latency:延遲
- tier
- hostname
- datacenter
- uuid
注意這里的latency的計(jì)算規(guī)則是:currentTimeMillis - (timestamp*1000)。
Chaperone的架構(gòu)
Chaperone的整體架構(gòu)分為:AuditLibrary, ChaperoneService, ChaperoneCollector和WebService, 它們會(huì)收集數(shù)據(jù),并進(jìn)行相關(guān)計(jì)算,自動(dòng)檢測(cè)出丟失和延遲的數(shù)據(jù),并展示審計(jì)結(jié)果。
從Chaperone的github上的源碼來看:
Chaperone分為ChaperoneClient, ChaperoneCollector, ChaperoneDistribution, ChaperoneServiceController, ChaperoneServiceWorker這5個(gè)子項(xiàng)目。對(duì)比著上面的架構(gòu)圖來分析。
- ChaperoneClient對(duì)應(yīng)著AuditLibrary,主要是用來audit message的庫(library),并不以實(shí)際服務(wù)運(yùn)行,可以在Producer或者Consumer客戶端中調(diào)用,默認(rèn)使用10mins的滾動(dòng)時(shí)間bucket來不斷地從每個(gè)主題收集消息。然后發(fā)送到kafka的chaperone-audit這個(gè)topic中。官方文檔介紹說AuditLibrary會(huì)被ChaperoneService, ChaperoneCollector和WebService這三個(gè)組件所依賴,但代碼中來看并非完全如此,略有出入。
- ChaperoneDistribution可以忽略
- ChaperoneServiceController和ChaperoneServiceWorker對(duì)應(yīng)架構(gòu)圖中的ChaperoneService,ChaperoneServiceController主要用來檢測(cè)topics并分配topic-partitions給ChaperoneServiceWorker用以審計(jì)(audit)。ChaperoneServiceWorker主要是audit message的一個(gè)服務(wù)。
- ChaperoneServiceWorker采用scala語言編寫,內(nèi)部又將ChaperoneClient或者說AuditLibrary又重新用Scala實(shí)現(xiàn)了一番,并豐富了一下應(yīng)用,比如采用hsqldb存儲(chǔ)數(shù)據(jù),zk存取offsets來實(shí)現(xiàn)WAL(預(yù)寫式日志,具體可見下段介紹)
- Chaperone認(rèn)為message中內(nèi)嵌timestamp是十分必須的,但是從ChaperoneServiceWorker的代碼來看消息沒有timestamp也能運(yùn)行,當(dāng)消息沒有時(shí)間戳,那么會(huì)記錄noTimeMsgCount,Chaperone介紹會(huì)有一個(gè)牛逼的算法來分析消息中的timestamp(其實(shí)就是讀取消息的開頭部分,而不是全部整條消息,類似報(bào)文截?cái)嘟馕?#xff0c;下面也有涉及介紹),如果解析timestamp失敗,會(huì)記錄malformedMsgCount。
- ChaperoneCollector對(duì)是用來讀取audit的數(shù)據(jù),然后持久化操作,默認(rèn)存入mysql中,看代碼也可選存入redis中。
- 源碼中沒有WebService這個(gè)東西,估計(jì)是uber內(nèi)部的web系統(tǒng),讀取下mysql中的內(nèi)容展示到頁面而已。
如果程序段內(nèi)嵌Audit Library(ChaperoneClient),那么整個(gè)audit過程如下:
如果producer端或者consumer端需要進(jìn)行消息審計(jì),那么可以內(nèi)嵌Audit Library。就以發(fā)送端為例,消息先發(fā)送到kafka中,然后對(duì)這條消息進(jìn)行審計(jì),將審計(jì)的信息存入到kafka中,之后有專門的ChaperoneServiceCollector進(jìn)行數(shù)據(jù)消費(fèi),之后存入mysql中,也可以選擇存入redis中。頁面系統(tǒng)webService可以查詢mysql(redis)中的數(shù)據(jù),之后進(jìn)而在頁面中展示。
如果使用ChanperoneServiceWork,整個(gè)流轉(zhuǎn)過程如下:
上面是對(duì)broker端進(jìn)行審計(jì)的過程。首先從存儲(chǔ)消息的kafka(圖中上面的kafka)中消費(fèi)數(shù)據(jù),之后對(duì)收到的消息進(jìn)行審計(jì)操作,之后將審計(jì)消息auditmsg以及相應(yīng)的offset存儲(chǔ)起來(auditmsg存入hsqldb中,offset存到用來存儲(chǔ)審計(jì)數(shù)據(jù)的kafka的zk之中),之后再將審計(jì)消息auditmsg存入kafka(圖中下面的kafka)中,最后成功存儲(chǔ)并返回給消費(fèi)端(Consumer1,即ChaperoneServiceWork),之后再把hsqldb中的auditmsg標(biāo)記為已統(tǒng)計(jì)。之后ChaperoneServiceCollector和producer端(consumer端)內(nèi)嵌Audit Library時(shí)相同。
官方文檔部分介紹如下:
每個(gè)消息只被審計(jì)一次
為了確保每個(gè)消息只被審計(jì)一次,ChaperoneService使用了預(yù)寫式日志(WAL)。ChaperoneService每次在觸發(fā)Kafka審計(jì)消息時(shí),會(huì)往審計(jì)消息里添加一個(gè)UUID。這個(gè)帶有相關(guān)偏移量的消息在發(fā)送到Kafka之前被保存在WAL里。在得到Kafka的確認(rèn)之后,WAL里的消息被標(biāo)記為已完成。如果ChaperoneService崩潰,在重啟后它可以重新發(fā)送WAL里未被標(biāo)記的審計(jì)消息,并定位到最近一次的審計(jì)偏移量,然后繼續(xù)消費(fèi)。WAL確保了每個(gè)Kafka消息只被審計(jì)一次,而且每個(gè)審計(jì)消息至少會(huì)被發(fā)送一次。
接下來,ChaperoneCollector使用ChaperoneService之前添加過的UUID來移除重復(fù)消息。有了UUID和WAL,我們可以確保審計(jì)的一次性。在代理客戶端和服務(wù)器端難以實(shí)現(xiàn)一次性保證,因?yàn)檫@樣會(huì)給它們帶來額外的開銷。我們依賴它們的優(yōu)雅關(guān)閉操作,這樣它們的狀態(tài)才會(huì)被沖刷出去。
在層間使用一致性的時(shí)間戳
因?yàn)镃haperone可以在多個(gè)層里看到相同的Kafka消息,所以為消息內(nèi)嵌時(shí)間戳是很有必要的。如果沒有這些時(shí)間戳,在計(jì)數(shù)時(shí)會(huì)發(fā)生時(shí)間錯(cuò)位。在Uber,大部分發(fā)送到Kafka的數(shù)據(jù)要么使用avro風(fēng)格的schema編碼,要么使用JSON格式。對(duì)于使用schema編碼的消息,可以直接獲取時(shí)間戳。而對(duì)于JSON格式的消息,需要對(duì)JSON數(shù)據(jù)進(jìn)行解碼才能拿到時(shí)間戳。為了加快這個(gè)過程,我們實(shí)現(xiàn)了一個(gè)基于流的JSON消息解析器,這個(gè)解析器無需預(yù)先解碼整個(gè)消息就可以掃描到時(shí)間戳。這個(gè)解析器用在ChaperoneService里是很高效的,不過對(duì)代理客戶端和服務(wù)器來說仍然需要付出很高代價(jià)。所以在這兩個(gè)層里,我們使用的是消息的處理時(shí)間戳。因?yàn)闀r(shí)間戳的不一致造成的層間計(jì)數(shù)差異可能會(huì)觸發(fā)錯(cuò)誤的數(shù)據(jù)丟失警告。我們正在著手解決時(shí)間戳不一致問題,之后也會(huì)把解決方案公布出來。
溫馨提示: github上的quickstart中,如果不能根據(jù)腳本自動(dòng)安裝kafka和zk,而是自己安裝kafka和zk的話,需要改動(dòng)腳本、配置文件甚至源碼才能將服務(wù)運(yùn)行起來。另外需要安裝hsqldb和mysql(redis)。
Confluent Control Center
文檔地址:http://docs.confluent.io/3.0.0/control-center/docs/index.html
這是個(gè)收費(fèi)產(chǎn)品,文檔中介紹的并不多。和Chaperone相同,主要也是根據(jù)消息payload內(nèi)嵌timestamp來實(shí)現(xiàn),計(jì)算time_bucket的算法是:floor((timestamp /15)*15)。
架構(gòu)圖如下:
主要是在producer端或者consumer端內(nèi)嵌審計(jì)程序(相當(dāng)于Chaperone的Audit Library)繼續(xù)審計(jì),最終將審計(jì)消息同樣存入kafka中,最后的web系統(tǒng)是直接消費(fèi)kafka中的審計(jì)消息進(jìn)行內(nèi)容呈現(xiàn)。
web系統(tǒng)部分呈現(xiàn)如下:
Kafka Monitor
github地址:https://github.com/linkedin/kafka-monitor
Kafka Monitor是基于在消息payload內(nèi)嵌index和timestamp來實(shí)現(xiàn)審計(jì):消息丟失,消息重復(fù)以及端到端延遲等。
web系統(tǒng)部分呈現(xiàn)如下:
幾種典型的metrics解釋:
| produce-avaliablility-avg | The average produce availability |
| consume-avaliability-avg | The average consume availability |
| records-produced-total | The total number of records that are produced |
| records-consumed-total | The total number of records that are consumed |
| records-lost-total | The total number of records that are lost |
| records-duplicated-total | The total number of records that are duplicated |
| records-delay-ms-avg | The average latency of records from producer to consumer |
| records-produced-rate | The average number of records per second that are produced |
| produce-error-rate | The average number of errors per second |
| consume-error-rate | The average number of errors per second |
| records-delay-ms-99th | The 99th percentile latency of records from producer to consu |
| records-delay-ms-999th | The 999th percentile latency of records from producer to consumer |
| records-delay-ms-max | The maximum latency of records from producer to consumer |
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-end-to-end-audit/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的Kafka端到端审计的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 为什么QueueingConsumer会
- 下一篇: Highly Available (Mi