Kafka 配置说明
Kafka 配置說明
2015年11月27日 15:30:35?hanjibing1990?閱讀數:5446更多
個人分類:?Kafka
轉載自http://liyonghui160com.iteye.com/blog/2163899
?
?server.properties配置:
?
server.properties中所有配置參數說明(解釋)如下列表:
?
?
| 參數 | 說明(解釋) |
| broker.id =0 | 每一個broker在集群中的唯一表示, 要求是正數。當該服務器的IP地址 發生改變時,broker.id沒有變化, 則不會影響consumers的消息情況 |
| log.dirs=/data/kafka-logs | kafka數據的存放地址,多個地址的 話用逗號分割,多個目錄分布在不同 磁盤上可以提高讀寫性能 ? /data/kafka-logs-1, /data/kafka-logs-2 |
| port =9092 | broker server服務端口 |
| message.max.bytes =6525000 | 表示消息體的最大大小, 單位是字節 |
| num.network.threads =4 | broker處理消息的最大線程數, 一般情況下不需要去修改 |
| num.io.threads =8 | broker處理磁盤IO的線程數, 數值應該大于你的硬盤數 |
| background.threads =4 | 一些后臺任務處理的線程數, 例如過期消息文件的刪除等, 一般情況下不需要去做修改 |
| queued.max.requests =500 | 等待IO線程處理的請求隊列最大數, 若是等待IO的請求超過這個數值, 那么會停止接受外部消息, 應該是一種自我保護機制。 |
| host.name | broker的主機地址,若是設置了, 那么會綁定到這個地址上, 若是沒有,會綁定到所有的接口上, 并將其中之一發送到ZK,一般不設置 |
| socket.send.buffer.bytes=100*1024 | socket的發送緩沖區, socket的調優參數SO_SNDBUFF |
| socket.receive.buffer.bytes =100*1024 | socket的接受緩沖區, socket的調優參數SO_RCVBUFF |
| socket.request.max.bytes =100*1024*1024 | socket請求的最大數值, 防止serverOOM,message.max.bytes 必然要小于socket.request.max.bytes, 會被topic創建時的指定參數覆蓋 |
| log.segment.bytes =1024*1024*1024 | topic的分區是以一堆segment文件存儲的, 這個控制每個segment的大小, 會被topic創建時的指定參數覆蓋 |
| log.roll.hours =24*7 | 這個參數會在日志segment沒有 達到log.segment.bytes設置的大小, 也會強制新建一個segment會被 topic創建時的指定參數覆蓋 |
| log.cleanup.policy = delete | 日志清理策略選擇有:delete和 compact主要針對過期數據的處理, 或是日志文件達到限制的額度, 會被?topic創建時的指定參數覆蓋 |
| log.retention.minutes=3days | 數據存儲的最大時間超過這個時間會 根據log.cleanup.policy設置的 策略處理數據,也就是消費端 能夠多久去消費數據 log.retention.bytes和 log.retention.minutes 任意一個達到要求,都會執行刪除, 會被topic創建時的指定參數覆蓋 |
| log.retention.bytes=-1 | topic每個分區的最大文件大小, 一個topic的大小限制?=? 分區數*log.retention.bytes。 -1沒有大小限log.retention.bytes和 log.retention.minutes任意一個達到要求, 都會執行刪除,會被topic創建時的指定參數覆蓋 |
| log.retention.check.interval.ms=5minutes | 文件大小檢查的周期時間,是否處罰 log.cleanup.policy中設置的策略 |
| log.cleaner.enable=false | 是否開啟日志壓縮 |
| log.cleaner.threads =?2 | 日志壓縮運行的線程數 |
| log.cleaner.io.max.bytes.per.second=None | 日志壓縮時候處理的最大大小 |
| log.cleaner.dedupe.buffer.size=500*1024*1024 | 日志壓縮去重時候的緩存空間, 在空間允許的情況下,越大越好 |
| log.cleaner.io.buffer.size=512*1024 | 日志清理時候用到的IO塊大小一般不需要修改 |
| log.cleaner.io.buffer.load.factor =0.9 | 日志清理中hash表的擴大因子一般不需要修改 |
| log.cleaner.backoff.ms =15000 | 檢查是否處罰日志清理的間隔 |
| log.cleaner.min.cleanable.ratio=0.5 | 日志清理的頻率控制,越大意味著更高效的清理, 同時會存在一些空間上的浪費, 會被topic創建時的指定參數覆蓋 |
| log.cleaner.delete.retention.ms =1day | 對于壓縮的日志保留的最長時間, 也是客戶端消費消息的最長時間, 同log.retention.minutes 的區別在于一個控制未壓縮數據, 一個控制壓縮后的數據。 會被topic創建時的指定參數覆蓋 |
| log.index.size.max.bytes =10*1024*1024 | 對于segment日志的索引文件大小限制, 會被topic創建時的指定參數覆蓋 |
| log.index.interval.bytes =4096 | 當執行一個fetch操作后, 需要一定的空間來掃描最近的offset大小, 設置越大,代表掃描速度越快, 但是也更好內存, 一般情況下不需要搭理這個參數 |
| log.flush.interval.messages=None | log文件”sync”到磁盤之前累積的消息條數, 因為磁盤IO操作是一個慢操作, 但又是一個”數據可靠性"的必要手段, 所以此參數的設置,需要在"數據可靠性" 與"性能"之間做必要的權衡.如果此值過大, 將會導致每次"fsync"的時間較長(IO阻塞), 如果此值過小,將會導致"fsync"的次數較多, 這也意味著整體的client請求有一定的延遲. 物理server故障,將會導致沒有fsync的消息丟失. |
| log.flush.scheduler.interval.ms =3000 | 檢查是否需要固化到硬盤的時間間隔 |
| log.flush.interval.ms = None | 僅僅通過interval來控制消息的磁盤寫入時機, 是不足的.此參數用于控制"fsync"的時間間隔, 如果消息量始終沒有達到閥值, 但是離上一次磁盤同步的時間間隔達到閥值,也將觸發. |
| log.delete.delay.ms =60000 | 文件在索引中清除后保留的時間一般不需要去修改 |
| log.flush.offset.checkpoint.interval.ms =60000 | 控制上次固化硬盤的時間點, 以便于數據恢復一般不需要去修改 |
| auto.create.topics.enable =true | 是否允許自動創建topic,若是false, 就需要通過命令創建topic |
| default.replication.factor =1 | 副本的個數 |
| num.partitions =1 | 每個topic的分區個數,若是在topic 創建時候沒有指定的話會被topic 創建時的指定參數覆蓋 |
| ? | ? |
| 以下是kafka中Leader,replicas配置參數 | ? |
| controller.socket.timeout.ms =30000 | partition?leader與replicas 之間通訊時,socket的超時時間 |
| controller.message.queue.size=10 | partition?leader與replicas 數據同步時,消息的隊列尺寸 |
| replica.lag.time.max.ms =10000 | replicas響應partition?leader 的最長等待時間,若是超過這個時間, 就將replicas列入ISR(in-sync replicas), 并認為它是死的,不會再加入管理中 |
| replica.lag.max.messages =4000 | 如果follower落后與leader太多, 將會認為此follower[或者說 partition?relicas]已經失效 ##通常,在follower與leader通訊時, 因為網絡延遲或者鏈接斷開, 總會導致replicas中消息同步滯后 ##如果消息之后太多,leader將認為 此follower網絡延遲較大或者消息吞吐能力有限, 將會把此replicas遷移 ##到其他follower中. ##在broker數量較少, 或者網絡不足的環境中, 建議提高此值. |
| replica.socket.timeout.ms=30*1000 | follower與leader之間的socket超時時間 |
| replica.socket.receive.buffer.bytes=64*1024 | leader復制時候的socket緩存大小 |
| replica.fetch.max.bytes =1024*1024 | replicas每次獲取數據的最大大小 |
| replica.fetch.wait.max.ms =500 | replicas同leader之間通信的 最大等待時間,失敗了會重試 |
| replica.fetch.min.bytes =1 | fetch的最小數據尺寸,如果leader 中尚未同步的數據不足此值,將會阻塞, 直到滿足條件 |
| num.replica.fetchers=1 | leader進行復制的線程數, 增大這個數值會增加follower的IO |
| replica.high.watermark.checkpoint.interval.ms =5000 | 每個replica檢查是否將 最高水位進行固化的頻率 |
| controlled.shutdown.enable =false | 是否允許控制器關閉broker , 若是設置為true,會關閉所有 在這個broker上的leader, 并轉移到其他broker |
| controlled.shutdown.max.retries =3 | 控制器關閉的嘗試次數 |
| controlled.shutdown.retry.backoff.ms =5000 | 每次關閉嘗試的時間間隔 |
| leader.imbalance.per.broker.percentage =10 | leader的不平衡比例, 若是超過這個數值, 會對分區進行重新的平衡 |
| leader.imbalance.check.interval.seconds =300 | 檢查leader是否不平衡的時間間隔 |
| offset.metadata.max.bytes | 客戶端保留offset信息的最大空間大小 |
| kafka中zookeeper參數配置 | ? |
| zookeeper.connect = localhost:2181 | zookeeper集群的地址,可以是多個, 多個之間用逗號分割 hostname1:port1,hostname2:port2, hostname3:port3 |
| zookeeper.session.timeout.ms=6000 | ZooKeeper的最大超時時間, 就是心跳的間隔,若是沒有反映, 那么認為已經死了,不易過大 |
| zookeeper.connection.timeout.ms =6000 | ZooKeeper的連接超時時間 |
| zookeeper.sync.time.ms =2000 | ZooKeeper集群中leader和 follower之間的同步實際那 |
?
?
0.8.1版server.properties配置
?
broker.id? 默認值:無
每一個broker都有一個唯一的id,這是一個非負整數,這個id就是broker的"名字",這樣就允許broker遷移到別的機器而不會影響消費者。你可以選擇任意一個數字,只要它是唯一的。
log.dirs 默認值:/tmp/kafka-logs
一個用逗號分隔的目錄列表,可以有多個,用來為Kafka存儲數據。每當需要為一個新的partition分配一個目錄時,會選擇當前的存儲partition最少的目錄來存儲。
port 默認值:6667
server用來接受client請求的端口。
zookeeper.connect 默認值:null
指定了ZooKeeper的connect string,以hostname:port的形式,hostname和port就是ZooKeeper集群各個節點的hostname和port。 ZooKeeper集群中的某個節點可能會掛掉,所以可以指定多個節點的connect string。如下所式:
hostname1:port1,hostname2:port2,hostname3:port3?.
ZooKeeper也可以允許你指定一個"chroot"的路徑,可以讓Kafka集群將需要存儲在ZooKeeper的數據存儲到指定的路徑下這可以讓多個Kafka集群或其他應用程序公用同一個ZooKeeper集群。可以使用如下的connect string:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
這樣就可以講這個集群的所有數據存放在/chroot/path路徑下。注意在啟動集群前,一定要先自己創建這個路徑,consumer也得使用相同的connect string。
message.max.bytes 默認值:1000000
server能接收的一條消息的最大的大小。這個屬性跟consumer使用的最大fetch大小是一致的,這很重要,否則一個不守規矩的producer會發送一個太大的消息。
num.network.threads 默認值:3
處理網絡的線程的數量,server端用來處理網絡請求,一般不需要改變它。
num.io.threads 默認值:8
server端處理請求時的I/O線程的數量,不要小于磁盤的數量。
background.threads 默認值:4
用來處理各種不同的后臺任務的線程數量,比如刪除文件,一般不需要改變它。
queued.max.requests 默認值:500
I/O線程等待隊列中的最大的請求數,超過這個數量,network線程就不會再接收一個新的請求。
host.name 默認值:null
broker的hostname,如果設置了它,會僅綁定這個地址。如果沒有設置,則會綁定所有的網絡接口,并提交一個給ZK。
advertised.host.name 默認值:null
如果設置了這個hostname,會分發給所有的producer,consumer和其他broker來連接自己。
advertised.port 默認值:null
分發這個端口給所有的producer,consumer和其他broker來建立連接。如果此端口跟server綁定的端口不同,則才有必要設置。
socket.send.buffer.bytes 默認值:100 * 1024
server端用來處理socket連接的SO_SNDBUFF緩沖大小。
socket.receive.buffer.bytes 默認值:100 * 1024
server端用來處理socket連接的SO_RCVBUFF緩沖大小。
socket.request.max.bytes 默認值:100 * 1024 * 1024
server能接受的請求的最大的大小,這是為了防止server跑光內存,不能大于Java堆的大小。
num.partitions 默認值:1
如果在創建topic的時候沒有指定partition的數量,則使用這個值來設置。
log.segment.bytes 默認值:1024 * 1024 * 1024
一個topic的一個partition對應的所有segment文件稱為log。這個設置控制著一個segment文件的最大的大小,如果超過了此大小,就會生成一個新的segment文件。此配置可以被覆蓋,參考?the per-topic configuration section。
log.roll.hours 默認值:24 * 7
這個設置會強制Kafka去roll一個新的log segment文件,即使當前使用的segment文件的大小還沒有超過log.segment.bytes。此配置可以被覆蓋,參考?the per-topic configuration section。
log.cleanup.policy 默認值:delete
此配置可以設置成delete或compact。如果設置為delete,當log segment文件的大小達到上限,或者roll時間達到上限,文件將會被刪除。如果設置成compact,則此文件會被清理,標記成已過時狀態,詳見?log compaction?。此配置可以被覆蓋,參考?the per-topic configuration section。
log.retention.minutes 默認值:7 days
在刪除log文件之前,保存在磁盤的時間,單位為分鐘,這是所有topic的默認值。注意如果同時設置了log.retention.minutes和 log.retention.bytes,如果達到任意一個條件的限制,都會馬上刪掉。此配置可以被覆蓋,參考?the per-topic configuration section。
log.retention.bytes 默認值:-1
topic每個分區的最大文件大小,一個topic的大小限制 = 分區數 * log.retention.bytes。-1沒有大小限log.retention.bytes和log.retention.minutes任意一個 達到要求,都會執行刪除。此配置可以被覆蓋,參考?the per-topic configuration section。
log.retention.check.interval.ms 默認值:5 minutes
檢查任意一個log segment文件是否需要進行retention處理的時間間隔。
log.cleaner.enable 默認值:false
設置為true就開啟了log compaction功能。
log.cleaner.threads 默認值:1
使用log compaction功能來清理log的線程的數量。
log.cleaner.io.max.bytes.per.second 默認值:None
在執行log compaction的過程中,限制了cleaner每秒鐘I/O的數據量,以免cleaner影響正在執行的請求。
log.cleaner.dedupe.buffer.size 默認值:500 * 1024 * 1024
日志壓縮去重時候的緩存空間,在空間允許的情況下,越大越好。
log.cleaner.io.buffer.size 默認值:512 * 1024
日志清理時候用到的I/O塊(chunk)大小,一般不需要修改。
log.cleaner.io.buffer.load.factor 默認值:0.9
日志清理中hash表的擴大因子,一般不需要修改。
log.cleaner.backoff.ms 默認值:15000
檢查log是否需要clean的時間間隔。
log.cleaner.min.cleanable.ratio 默認值:0.5
控制了log compactor進行clean操作的頻率。默認情況下,當log的50%以上已被clean時,就不用繼續clean了。此配置可以被覆蓋,參考?the per-topic configuration section。
log.cleaner.delete.retention.ms 默認值:1 day
對于壓縮的日志保留的最長時間,也是客戶端消費消息的最長時間,同log.retention.minutes的區別在于一個控制未壓縮數據,一個控制壓縮后的數據,參考?the per-topic configuration section。
log.index.size.max.bytes 默認值:10 * 1024 * 1024
每一個log segment文件的offset index文件的最大的size。注意總是預分配一個稀疏(sparse)文件,當roll這個文件時再shrink down。如果index文件被寫滿,那么就roll一個新的log segment文件,即使還沒達到log.segment.byte限制。參考?the per-topic configuration section。
log.index.interval.bytes 默認值:4096
當執行一個fetch操作后,需要一定的空間來掃描最近的offset大小,設置越大,代表掃描速度越快,但是也更耗內存,一般情況下不需要改變這個參數。
log.flush.interval.messages 默認值:None
在強制fsync一個partition的log文件之前暫存的消息數量。調低這個值會更頻繁的sync數據到磁盤,影響性能。通常建議人家使用replication來確保持久性,而不是依靠單機上的fsync,但是這可以帶來更多的可靠性。
log.flush.scheduler.interval.ms 默認值:3000
log flusher檢查是否需要把log刷到磁盤的時間間隔,單位為ms。
log.flush.interval.ms 默認值:None
2次fsync調用之間最大的時間間隔,單位為ms。即使log.flush.interval.messages沒有達到,只要這個時間到了也需要調用fsync。
log.delete.delay.ms 默認值:60000
在log文件被移出索引后,log文件的保留時間。在這段時間內運行的任意正在進行的讀操作完成操作,不用去打斷它。通常不需要改變。
log.flush.offset.checkpoint.interval.ms 默認值:60000
?
記錄上次把log刷到磁盤的時間點的頻率,用來日后的recovery。通常不需要改變。
auto.create.topics.enable 默認值:true
是否允許自動創建topic。如果設為true,那么produce,consume或者fetch metadata一個不存在的topic時,就會自動創建一個默認replication factor和partition number的topic。
controller.socket.timeout.ms 默認值:30000
partition管理控制器發向replica的命令的socket超時時間。
controller.message.queue.size 默認值:10
partition leader與replicas數據同步時的消息的隊列大小。
default.replication.factor 默認值:1
自動創建topic時的默認replication factor的(副本)個數。
replica.lag.time.max.ms 默認值:10000
如果一個follower在有一個時間窗口內沒有發送任意fetch請求,leader就會把這個follower從ISR(in-sync replicas)移除,并認為它已掛掉。
replica.lag.max.messages 默認值:4000
如果一個replica落后leader此配置指定的消息條數,leader就會把它移除ISR,并認為它掛掉。
replica.socket.timeout.ms 默認值:300 * 1000
復制數據過程中,replica發送給leader的網絡請求的socket超時時間。
replica.socket.receive.buffer.bytes 默認值:64 * 1024
復制數據過程中,replica發送網絡請求給leader的socket receiver buffer的大小。
replica.fetch.max.bytes 默認值:1024 * 1024
復制數據過程中,replica發送給leader的fetch請求試圖獲取數據的最大的字節數。
replica.fetch.wait.max.ms 默認值:500
復制數據過程中,為了fetch數據,replica發送請求給leader的最大的等待時間。
replica.fetch.min.bytes 默認值:1
復制數據過程中,replica收到的每個fetch響應,期望的最小的字節數,如果沒有收到足夠的字節數,就會等待期望更多的數據,直到達到replica.fetch.wait.max.ms。
num.replica.fetchers 默認值:1
用來從leader復制消息的線程數量,增大這個值可以增加follow的I/O并行度。
replica.high.watermark.checkpoint.interval.ms 默認值:5000
每一個replica存儲自己的high watermark到磁盤的頻率,用來日后的recovery。
fetch.purgatory.purge.interval.requests 默認值:10000
含義暫不明,日后研究。The purge interval (in number of requests) of the fetch request purgatory.
producer.purgatory.purge.interval.requests 默認值:10000
含義暫不明,日后研究。The purge interval (in number of requests) of the producer request purgatory.
zookeeper.session.timeout.ms 默認值:6000
ZooKeeper的session的超時時間,如果在這段時間內沒有收到ZK的心跳,則會被認為該Kafka server掛掉了。如果把這個值設置得過低可能被誤認為掛掉,如果設置得過高,如果真的掛了,則需要很長時間才能被server得知。
zookeeper.connection.timeout.ms 默認值:6000
client連接到ZK server的超時時間。
zookeeper.sync.time.ms 默認值:2000
一個ZK follower能落后leader多久。
controlled.shutdown.enable 默認值:false
如果為true,在關閉一個broker前,會把當前broker上的所有partition,如果有為leader的話,會把leader權交給其他broker上的相應的partition。這會降低在關閉期間不可用的時間窗口。
controlled.shutdown.max.retries 默認值:3
在執行一個unclean(強行關閉?)的關閉操作前,為了成功完成關閉操作,最大的重試次數。
controlled.shutdown.retry.backoff.ms 默認值:5000
在關閉重試期間的回退(backoff)時間。
auto.leader.rebalance.enable 默認值:false
如果設為true,復制控制器會周期性的自動嘗試,為所有的broker的每個partition平衡leadership,為更優先(preferred)的replica分配leadership。
leader.imbalance.per.broker.percentage 默認值:10
每個broker允許的不平衡的leader的百分比。如果每個broker超過了這個百分比,復制控制器會重新平衡leadership。
leader.imbalance.check.interval.seconds 默認值:300
檢測leader不平衡的時間間隔。
offset.metadata.max.bytes 默認值:1024
允許client(消費者)保存它們元數據(offset)的最大的數據量。
?
?
?
kafka?producer配置
Xml代碼??
?
kafkaconsumer端配置
?
Xml代碼??
?
總結
以上是生活随笔為你收集整理的Kafka 配置说明的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring cloud: 使用cons
- 下一篇: 关于使用spring admin和con