Kafka部署、原理和使用介绍
Kafka簡介及Kafka部署、原理和使用介紹
Kafka簡介
定義
Kafka是一種消息隊列,是一個分布式的基于發布/訂閱模式的,主要用來處理大量數據狀態下的消息隊列,一般用來做日志的處理。既然是消息隊列,那么Kafka也就擁有消息隊列的相應的特性了。
消息隊列的兩種模式
點對點模式
一對一,消費者主動拉取數據,消息收到后消息清除
? 消息生產者生產消息發送到Queue中,然后消息消費者從Queue中取出并且消費消息。
消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。
發布/訂閱模式
一對多,消費者消費數據之后不會清除消息
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
Kafka基礎架構
組件釋義
(1)Producer :消息生產者,就是向kafka broker發消息的客戶端;
(2)Consumer :消息消費者,向kafka broker取消息的客戶端;
(3)Consumer Group (CG):消費者組,由多個consumer組成。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個消費者消費;消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
(4)Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
(5)Topic :可以理解為一個隊列,生產者和消費者面向的都是一個topic;
(6)Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列;
(7)Replica:副本,為保證集群中的某個節點發生故障時,該節點上的partition數據不丟失,且kafka仍然能夠繼續工作,kafka提供了副本機制,一個topic的每個分區都有若干個副本,一個leader和若干個follower。
(8)leader:每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是leader。
(9)follower:每個分區多個副本中的“從”,實時從leader中同步數據,保持和leader數據的同步。leader發生故障時,某個follower會成為新的leader。
Kafka安裝部署
相關連接
Kafka官網:http://kafka.apache.org
Kafka下載:http://kafka.apache.org/downloads
本次部署2.4.1版本下載:https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz
環境提前部署zookeeper:
zookeeper部署方式[文檔連接]:https://blog.csdn.net/wt334502157/article/details/115213645
集群部署
3臺服務器對應在/etc/hosts文件中配置一下解析
ops01 11.8.37.50
ops02 11.8.36.63
ops03 11.8.36.76
# 官網下載安裝包,并上傳安裝包至服務器,或者在服務器上直接wget jyc@ops01:/opt/software >ll | grep kafka -rw-r--r-- 1 jyc jyc 70159813 Apr 29 10:56 kafka_2.11-2.4.1.tgz jyc@ops01:/opt/software > # 將包解壓至/opt/module目錄下 jyc@ops01:/opt/software >tar -zxf kafka_2.11-2.4.1.tgz -C /opt/module/ jyc@ops01:/opt/software >ll /opt/module/ total 36 drwxrwxr-x 2 jyc jyc 4096 Apr 4 11:01 datas drwxr-xr-x 12 jyc jyc 4096 Apr 24 16:37 flume -rw-rw-r-- 1 jyc jyc 30 Apr 25 11:33 group.log drwxr-xr-x 12 jyc jyc 4096 Mar 12 11:38 hadoop-3.1.3 drwxrwxr-x 11 jyc jyc 4096 Apr 2 15:14 hive drwxr-xr-x 6 jyc jyc 4096 Mar 3 2020 kafka_2.11-2.4.1 drwxrwxr-x 3 jyc jyc 4096 Apr 10 16:25 tez drwxrwxr-x 5 jyc jyc 4096 Apr 2 15:03 tez-0.9.2_bak0410 drwxr-xr-x 8 jyc jyc 4096 Mar 25 11:02 zookeeper-3.5.7 # 程序目錄名更改為kafka,簡化后續操作 jyc@ops01:/opt/software >cd /opt/module/ jyc@ops01:/opt/module >mv kafka_2.11-2.4.1 kafka jyc@ops01:/opt/module >cd kafka/ jyc@ops01:/opt/module/kafka >ls bin config libs LICENSE NOTICE site-docs # 在/opt/module/kafka目錄下創建logs目錄,后續用于存儲相關文件 jyc@ops01:/opt/module/kafka >mkdir logs jyc@ops01:/opt/module/kafka >cd config/ # 配置文件server.properties jyc@ops01:/opt/module/kafka/config >vim server.properties jyc@ops01:/opt/module/kafka/config >cat /opt/module/kafka/config/server.properties |grep -vE "^#|^$" broker.id=0 delete.topic.enable=true num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/module/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=ops01:2181,ops02:2181,ops03:2181/kafka zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0# 【注意】 # broker.id=1 這個id每個kafka需要配置不同 (ops01->0 / ops02->1 / ops03->2) # delete.topic.enable=true 功能為解鎖刪除topic功能 # log.dirs=/opt/module/kafka/logs 配置logs所在目錄路徑 # num.network.threads=3 處理網絡請求的線程數量 # num.io.threads=8 用來處理磁盤IO的現成數量 # socket.send.buffer.bytes=102400 發送套接字的緩沖區大小 # socket.receive.buffer.bytes=102400 接收套接字的緩沖區大小 # socket.request.max.bytes=104857600 請求套接字的緩沖區大小 # num.partitions=1 topic在當前broker上的分區個數 # num.recovery.threads.per.data.dir=1 用來恢復和清理data下數據的線程數量 # log.retention.hours=168 segment文件保留的最長時間,超時將被刪除 # zookeeper.connect=ops01:2181,ops02:2181,ops03:2181/kafka 配置連接Zookeeper集群地址# 配置一下/etc/profile 環境變量 jyc@ops01:/opt/module/kafka/config >tail -4 /etc/profile#kafka export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin jyc@ops01:/opt/module/kafka/config ># 引用配置文件,或者關掉會話窗口重新打開也生效 jyc@ops01:/opt/module >source /etc/profile# 將kafka目錄直接scp到 ops02 和 ops03 兩臺服務器 jyc@ops01:/opt/module >scp -r kafka ops02:/opt/module/ jyc@ops01:/opt/module >scp -r kafka ops03:/opt/module/ ============================================================== # ops02 上操作增加環境變量 jyc@ops02:/opt/module >sudo vim /etc/profile #kafka export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin jyc@ops02:/opt/module >source /etc/profile# 在ops02上把kafka配置文件的broker.id 改為2 jyc@ops02:/opt/module >sudo vim /opt/module/kafka/config/server.properties broker.id=1# ops03 上操作增加環境變量 jyc@ops03:/opt/module >sudo vim /etc/profile #kafka export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin jyc@ops03:/opt/module >source /etc/profile# 在ops03上把kafka配置文件的broker.id 改為3 jyc@ops03:/opt/module/kafka >sudo vim /opt/module/kafka/config/server.properties broker.id=2# 啟動kafka之前,先看一下環境zookeeper運行是否正常,端口是否都在 jyc@ops01:/opt/module/kafka >ssh ops01 netstat -tnlpu | grep -E "2181|2888|3888" (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp6 0 0 11.8.37.50:3888 :::* LISTEN 41773/java tcp6 0 0 :::2181 :::* LISTEN 41773/java jyc@ops01:/opt/module/kafka >ssh ops02 netstat -tnlpu | grep -E "2181|2888|3888" (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp6 0 0 11.8.36.63:3888 :::* LISTEN 33012/java tcp6 0 0 :::2181 :::* LISTEN 33012/java tcp6 0 0 11.8.36.63:2888 :::* LISTEN 33012/java jyc@ops01:/opt/module/kafka > jyc@ops01:/opt/module/kafka >ssh ops03 netstat -tnlpu | grep -E "2181|2888|3888" (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp6 0 0 11.8.36.76:3888 :::* LISTEN 102422/java tcp6 0 0 :::2181 :::* LISTEN 102422/java # 在3臺服務器上啟動kafka -daemon是后臺運行 ,開啟服務需要指定配置文件 jyc@ops01:/opt/module >kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties jyc@ops02:/opt/module >kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties jyc@ops03:/opt/module >kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties# 查看kafka的9092端口 jyc@ops01:/opt/module >ssh ops01 "netstat -tnlpu|grep 9092" (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp6 0 0 :::9092 :::* LISTEN 69716/java jyc@ops01:/opt/module >ssh ops02 "netstat -tnlpu|grep 9092" (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp6 0 0 :::9092 :::* LISTEN 101034/java jyc@ops01:/opt/module >ssh ops03 "netstat -tnlpu|grep 9092" (Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.) tcp6 0 0 :::9092 :::* LISTEN 55645/java # 到此 kafka服務已經健康的運行起來了# 如果需要關閉kafka時,用stop腳本 jyc@ops01:/opt/module >kafka-server-stop.sh jyc@ops01:/opt/module >kafka-server-stop.sh jyc@ops01:/opt/module >kafka-server-stop.sh【注意1】:
服務正常啟動后,會在/opt/module/kafka/logs目錄下有一個meta.properties的生成源數據的文件,如果已經配置文件定義了broker.id并且啟動了kafka,那后續再啟動服務,會先從這個路徑下去讀取meta.properties
jyc@ops01:/opt/module/kafka/logs >pwd /opt/module/kafka/logs jyc@ops01:/opt/module/kafka/logs >cat meta.properties # #Thu Apr 29 16:12:25 CST 2021 cluster.id=rHb3lTrdQXau9HQrRBJP9w version=0 broker.id=0【注意2】:
如果報錯如下信息,大概率就是修改了broker的id,導致新舊的id值不一樣;修改meta.properties文件,把cluster.id=這行注釋,重啟zookeeper后,再重啟kafka,這時kafka讀取不到meta.properties中的cluster.id則會重新生成
The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.【注意3】:
zookeeper.connect=ops01:2181,ops02:2181,ops03:2181/kafka這個配置文件中最后加了/kafka;不加也能使用,但是會在根節點直接創建很多kafka相關的子節點,不便于管理;所以盡量加上這個/kafka
jyc@ops01:/opt/module/kafka >zkCli.sh Connecting to localhost:2181 2021-04-30 10:49:02,227 [myid:] - INFO [main:Environment@109] - Client environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls -w / [kafka, jyc, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls -w /kafka [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification] # 如果不配置/kafka 則上面的這些都會創建在根節點 [zk: localhost:2181(CONNECTED) 4] ls -w /kafka/brokers [ids, seqid, topics] # 在/kafka/brokers/ids可以查看到對應幾個broker節點的id信息,一定是和配置文件里的對應 [zk: localhost:2181(CONNECTED) 5] ls -w /kafka/brokers/ids [0, 1, 2]【注意4】:
如果集群沒有健康運行起來時,在后續操作添加topics時可能會遇到下方的報錯,例如創建3個副本,但是健康的broker節點是0個等,所以后續實驗開始前,還是要確認環境是否都搭建完畢,kafka是否都成功健康的運行起來
Error while executing topic command : replication factor: 3 larger than available brokers: 0Kafka命令行操作
# 查看當前服務器中的所有topic jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --list jyc@ops01:/opt/module/kafka/logs > # 創建topic jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --create --replication-factor 3 --partitions 2 --topic bigdata Created topic bigdata. # 選項說明: # --topic 后面定義topic定義的主題名 # --replication-factor 定義副本數,不能大于broker數 # --partitions 定義分區數(把topic拆分成幾個)jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --list bigdata jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --create --replication-factor 3 --partitions 2 --topic bigdata2 Created topic bigdata2. jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --list bigdata bigdata2# 刪除topic jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --delete --topic bigdata2 Topic bigdata2 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --list bigdata# 模擬生產數據和消費數據 # ops01 發送消息 jyc@ops01:/opt/module/kafka > jyc@ops01:/opt/module/kafka >kafka-console-producer.sh --broker-list ops01:9092 --topic bigdata >wang >ting >niubiplus ># ops02 消費消息 jyc@ops02:/opt/module/kafka/logs >kafka-console-consumer.sh --bootstrap-server ops01:9092 --from-beginning --topic bigdata ting wang niubiplus# --from-beginning 相當于把topic中數據從頭都拿到,不加是任務掛起時,開始接收最新的數據 jyc@ops03:/opt/module/kafka > # 查看某個Topic的詳情 jyc@ops03:/opt/module/kafka >kafka-topics.sh --zookeeper ops01:2181/kafka --describe --topic bigdata Topic: bigdata PartitionCount: 2 ReplicationFactor: 3 Configs: Topic: bigdata Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: bigdata Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0# 修改分區數partition數,2個更新成3個 jyc@ops03:/opt/module/kafka >kafka-topics.sh --zookeeper ops01:2181/kafka --alter --topic bigdata --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! # 再次查看 jyc@ops03:/opt/module/kafka >kafka-topics.sh --zookeeper ops01:2181/kafka --describe --topic bigdata Topic: bigdata PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: bigdata Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: bigdata Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0Topic: bigdata Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 jyc@ops03:/opt/module/kafka >Kafka架構進階
Kafka工作流程及文件存儲機制
? Kafka中消息是以topic進行分類的,生產者生產消息,消費者消費消息,都是面向topic的。topic把消息分門別類,相同的topic消息被放在了一個消息隊列。
? topic是邏輯上的概念,而partition是物理上的概念,每個partition對應于一個log文件,該log文件中存儲的就是producer生產的數據。Producer生產的數據會被不斷追加到該log文件末端,且每條數據都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。
由于生產者生產的消息會不斷追加到log文件末尾,為防止log文件過大導致數據定位效率低下,Kafka采取了分片和索引機制,將每個partition分為多個segment。每個segment對應三個文件——“.index”, ".timeindex"文件和“.log”文件。這些文件位于一個文件夾下,該文件夾的命名規則為:topic名稱+分區序號。例如,bigdata這個topic有三個分區,則其對應的文件夾為bigdata-0,bigdata-1,bigdata-2。
“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數據,索引文件中的元數據指向對應數據文件中message的物理偏移地址。
Kafka生產者
分區策略
分區原因
? (1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;
? (2)可以提高并發,因為相對topic可以對Partition級別更小的單位進行讀寫了。
分區原則
? (1)指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
? (2)沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取余得到 partition 值;
? (3)既沒有 partition 值又沒有 key 值的情況下,第一次調用時隨機生成一個整數(后面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 總數取余得到 partition 值,也就是常說的 round-robin 算法。
? 我們需要將producer發送的數據封裝成一個ProducerRecord對象。
數據可靠性保證
? 為保證producer發送的數據,能可靠的發送到指定的topic,topic的每個partition收到producer發送的數據后,都需要向producer發送ack(acknowledgement確認收到),如果producer收到ack,就會進行下一輪的發送,否則重新發送數據。
ACK (Acknowledge character)即是確認字符,在數據通信中,接收方發給發送方的一種傳輸類控制字符。表示發來的數據已確認接收無誤。
【注意】:
? partition確保有follower與leader同步完成,leader再發送ack,這樣才能保證leader掛掉之后,能在follower中選舉出新的leader,并不是收到后直接返回確認。
副本數據同步策略可行方案
| 1.半數以上完成同步,就發送ack | 延遲低 | 選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本 |
| 2.全部完成同步,才發送ack | 選舉新的leader時,容忍n臺節點的故障,需要n+1個副本 | 延遲高 |
Kafka選擇了第二種方案,原因如下:
(1)同樣為了容忍n臺節點的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分區都有大量的數據,第一種方案會造成大量數據的冗余。
(2)雖然第二種方案的網絡延遲會比較高,但網絡延遲對Kafka的影響較小。
ISR
? 采用第二種方案之后,設想以下情景:leader收到數據,所有follower都開始同步數據,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。這個問題怎么解決呢?
? Kafka提供了數據復制算法保證,如果leader發生故障或掛掉,一個新leader被選舉并被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中所有follower滯后的狀態。當producer發送一條消息到broker后,leader寫入消息并復制到所有follower。消息提交之后才被成功復制到所有的同步副本。消息復制延遲受最慢的follower限制,重要的是快速檢測慢副本,如果follower“落后”太多或者失效,leader將會把它從ISR中刪除。Leader發生故障之后,就會從ISR中選舉新的leader
ack應答機制
? 對于某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等ISR中的follower全部接收成功。所以Kafka為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡,選擇以下的配置。
acks參數配置:acks
0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經返回,當broker故障時有可能丟失數據;
1:producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前leader故障,那么將會丟失數據;
-1(all):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發送ack之前,leader發生故障,那么會造成數據重復。
故障處理細節
(1)follower故障
follower發生故障后會被臨時踢出ISR,待該follower恢復后,follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向leader進行同步。等該follower的LEO大于等于該Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader發生故障之后,會從ISR中選出一個新的leader,之后,為保證多個副本之間的數據一致性,其余的follower會先將各自的log文件高于HW的部分截掉,然后從新的leader同步數據。
【注意】:
這只能保證副本之間的數據一致性,并不能保證數據不丟失或者不重復。
Exactly Once語義
? 將服務器的ACK級別設置為-1,可以保證Producer到Server之間不會丟失數據,即At Least Once語義。相對的,將服務器ACK級別設置為0,可以保證生產者每條消息只會被發送一次,即At Most Once語義。
At Least Once可以保證數據不丟失,但是不能保證數據不重復;相對的,At Least Once可以保證數據不重復,但是不能保證數據不丟失。但是,對于一些非常重要的信息,比如說交易數據,下游數據消費者要求數據既不重復也不丟失,即Exactly Once語義。在0.11版本以前的Kafka,對此是無能為力的,只能保證數據不丟失,再在下游消費者對數據做全局去重。對于多個下游應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大影響。
0.11版本的Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指Producer不論向Server發送多少次重復數據,Server端都只會持久化一條。冪等性結合At Least Once語義,就構成了Kafka的Exactly Once語義。即:
At Least Once + 冪等性 = Exactly Once
? 要啟用冪等性,只需要將Producer的參數中enable.idompotence設置為true即可。Kafka的冪等性實現其實就是將原來下游需要做的去重放在了數據上游。開啟冪等性的Producer在初始化的時候會被分配一個PID,發往同一Partition的消息會附帶Sequence Number。而Broker端會對<PID, Partition, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker只會持久化一條。但是PID重啟就會變化,同時不同的Partition也具有不同主鍵,所以冪等性無法保證跨分區跨會話的Exactly Once。
Kafka消費者
消費方式
consumer采用pull(拉取)模式從broker中讀取數據。
? push(推送)模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。
? pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,Kafka的消費者在消費數據時會傳入一個時長參數timeout,如果當前沒有數據可供消費,consumer會等待一段時間之后再返回,這段時長即為timeout。
分區分配策略
? 一個consumer group中有多個consumer,一個 topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。
Kafka有兩種分配策略,一是roundrobin,一是range。
1)roundrobin
2)range
offset的維護
? 由于consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復后,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復后繼續消費。
? Kafka 0.9版本之前,consumer默認將offset保存在Zookeeper中,從0.9版本開始,consumer默認將offset保存在Kafka一個內置的topic中,該topic為__consumer_offsets。
jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --list __consumer_offsets bigdata jyc@ops01:/opt/module/kafka/logs >kafka-topics.sh --zookeeper ops01:2181/kafka --describe --topic __consumer_offsets Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 1 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 2 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 3 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 4 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 5 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 6 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 7 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 8 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 9 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 10 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 11 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 12 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 13 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 14 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 15 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 16 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 17 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 18 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 19 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 20 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 21 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 22 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 23 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 24 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 25 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 26 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 27 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 28 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 29 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 30 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 31 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 32 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 33 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 35 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 36 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 37 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 38 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 39 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 40 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 41 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 42 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 43 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 44 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 45 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 46 Leader: 2 Replicas: 2 Isr: 2Topic: __consumer_offsets Partition: 47 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 48 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 49 Leader: 2 Replicas: 2 Isr: 2 jyc@ops01:/opt/module/kafka/logs >Kafka高效讀寫數據
順序寫磁盤
? Kafka的producer生產數據,要寫入到log文件中,寫的過程是一直追加到文件末端,為順序寫。官網有數據表明,同樣的磁盤,順序寫能到到600M/s,而隨機寫只有100k/s。這與磁盤的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間。
應用Pagecache
Kafka數據持久化是直接持久化到Pagecache中,這樣會產生以下幾個好處
- I/O Scheduler 會將連續的小塊寫組裝成大塊的物理寫從而提高性能
- I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
- 充分利用所有空閑內存(非 JVM 內存)。如果使用應用層 Cache(即 JVM 堆內存),會增加 GC 負擔
- 讀操作可直接在 Page Cache 內進行。如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過 Page Cache)交換數據
- 如果進程重啟,JVM 內的 Cache 會失效,但 Page Cache 仍然可用
盡管持久化到Pagecache上可能會造成宕機丟失數據的情況,但這可以被Kafka的Replication機制解決。如果為了保證這種情況下數據不丟失而強制將 Page Cache 中的數據 Flush 到磁盤,反而會降低性能。
零復制技術
Zookeeper在Kafka中的作用
? Kafka集群中有一個broker會被選舉為Controller,負責管理集群broker的上下線,所有topic的分區副本分配和leader選舉等工作。
Controller的管理工作都是依賴于Zookeeper的。
broker0->id 0 broker1->id 1 broker2->id 2
監聽zookeeper中的/kafka/brokers/ids節點信息
當前leader異常
選舉新leader
Kafka事務
Kafka從0.11版本開始引入了事務支持。事務可以保證Kafka在Exactly Once語義的基礎上,生產和消費可以跨分區和會話,要么全部成功,要么全部失敗。
Producer事務
? 為了實現跨分區跨會話的事務,需要引入一個全局唯一的Transaction ID,并將Producer獲得的PID和Transaction ID綁定。這樣當Producer重啟后就可以通過正在進行的Transaction ID獲得原來的PID。
? 為了管理Transaction,Kafka引入了一個新的組件Transaction Coordinator。Producer就是通過和Transaction Coordinator交互獲得Transaction ID對應的任務狀態。Transaction Coordinator還負責將事務所有寫入Kafka的一個內部Topic,這樣即使整個服務重啟,由于事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。
Consumer事務
? 上述事務機制主要是從Producer方面考慮,對于Consumer而言,事務的保證就會相對較弱,尤其時無法保證Commit的信息被精確消費。這是由于Consumer可以通過offset訪問任意信息,而且不同的Segment File生命周期不同,同一事務的消息可能會出現重啟后被刪除的情況。
? 如果想完成Consumer端的精準一次性消費,那么需要kafka消費端將消費過程和提交offset過程做原子綁定。此時我們需要將kafka的offset保存到支持事務的自定義介質中(比如mysql)。
總結
以上是生活随笔為你收集整理的Kafka部署、原理和使用介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 服装进销存软件排名前十名,最新门店进销存
- 下一篇: 实测几款常见的DNS,看防护能力怎么样?