Kafka broker配置介绍 (四)
這部分內容對了解系統和提高軟件性能都有很大的幫助,kafka官網上也給出了比較詳細的配置詳單,但是我們還是直接從代碼來看broker到底有哪些配置需要我們去了解的,配置都有英文注釋,所以每一部分是干什么的就不翻譯了,都能看懂:
?
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | /** ?* Licensed to the Apache Software Foundation (ASF) under one or more ?* contributor license agreements.? See the NOTICE file distributed with ?* this work for additional information regarding copyright ownership. ?* The ASF licenses this file to You under the Apache License, Version 2.0 ?* (the "License"); you may not use this file except in compliance with ?* the License.? You may obtain a copy of the License at ?* ?*??? http://www.apache.org/licenses/LICENSE-2.0 ?* ?* Unless required by applicable law or agreed to in writing, software ?* distributed under the License is distributed on an "AS IS" BASIS, ?* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ?* See the License for the specific language governing permissions and ?* limitations under the License. ?*/ package kafka.server import java.util.Properties import kafka.utils.{Utils, ZKConfig} import kafka.message.Message /** ?* Configuration settings for the kafka server ?*/ class KafkaConfig(props: Properties) extends ZKConfig(props) { ??/* the port to listen and accept connections on */ ??val port: Int = Utils.getInt(props, "port", 6667) ??/* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */ ??val hostName: String = Utils.getString(props, "hostname", null) ??/* the broker id for this server */ ??val brokerId: Int = Utils.getInt(props, "brokerid") ??? ??/* the SO_SNDBUFF buffer of the socket sever sockets */ ??val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024) ??? ??/* the SO_RCVBUFF buffer of the socket sever sockets */ ??val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024) ??? ??/* the maximum number of bytes in a socket request */ ??val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue)) ??/* the maximum size of message that the server can receive */ ??val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue)) ??/* the number of worker threads that the server uses for handling all client requests*/ ??val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue)) ??? ??/* the interval in which to measure performance statistics */ ??val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue)) ??? ??/* the default number of log partitions per topic */ ??val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue)) ??? ??/* the directory in which the log data is kept */ ??val logDir = Utils.getString(props, "log.dir") ??? ??/* the maximum size of a single log file */ ??val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) ??/* the maximum size of a single log file for some specific topic */ ??val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", "")) ??/* the maximum time before a new log segment is rolled out */ ??val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue)) ??/* the number of hours before rolling out a new log segment for some specific topic */ ??val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", "")) ??/* the number of hours to keep a log file before deleting it */ ??val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue)) ??/* the number of hours to keep a log file before deleting it for some specific topic*/ ??val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", "")) ??? ??/* the maximum size of the log before deleting it */ ??val logRetentionSize = Utils.getLong(props, "log.retention.size", -1) ??/* the maximum size of the log for some specific topic before deleting it */ ??val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", "")) ??/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ ??val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue)) ??? ??/* enable zookeeper registration in the server */ ??val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true) ??/* the number of messages accumulated on a log partition before messages are flushed to disk */ ??val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue)) ??/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000? */ ??val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", "")) ??/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ ??val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms",? 3000) ??/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ ??val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate) ???/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */ ??val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", "")) ??/* the maximum length of topic name*/ ??val maxTopicNameLength = Utils.getIntInRange(props, "max.topic.name.length", 255, (1, Int.MaxValue)) } | 
上面這段代碼來自kafka.server包下的KafkaConfig類,之前我們就說過,broker就是kafka中的server,所以講配置放在這個包中也不奇怪。這里我們順著代碼往下讀,也順便看看scala的語法。和java一樣也要import相關的包,kafka將同一包內的兩個類寫在大括號中:
?| 1 | import kafka.utils.{Utils, ZKConfig} | 
然后我們看類的寫法:
?| 1 | class KafkaConfig(props: Properties) extends ZKConfig(props) | 
我們看到在加載kafkaConfig的時候會加載一個properties對象,同時也會加載有關zookeeper的properties,這個時候我們可以回憶一下,之前我們啟動kafka broker的命令:
1.??啟動zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties? & (用&是為了能退出命令行)
2.??啟動kafka server: ?bin/kafka-server-start.sh ../config/server.properties ?&
所以你能明白,初始化kafka broker的時候程序一定是去加載位于config文件夾下的properties,這個和java都一樣沒有區別。當然properties我們也可以通過程序來給出,這個我們后面再說,繼續看我們的代碼。既然找到了對應的properties文件,我們就結合代碼和properties一起來看。
Kafka broker的properties中,將配置分為以下六類:
l? Server Basics:關于brokerid,hostname等配置
l? Socket Server Settings:關于傳輸的配置,端口、buffer的區間等。
l? Log Basics:配置log的位置和partition的數量。
l? Log Flush Policy:這部分是kafka配置中最重要的部分,決定了數據flush到disk的策略。
l? Log Retention Policy:這部分主要配置日志處理時的策略。
l? Zookeeper:配置zookeeper的相關信息。
在文件properties中的配置均出現在kafkaConfig這個類中,我們再看看kafkaConfig中的代碼:
?| 1 2 3 4 5 | /* the broker id for this server */ ??val brokerId: Int = Utils.getInt(props, "brokerid") ??? ??/* the SO_SNDBUFF buffer of the socket sever sockets */ ??val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024) | 
凡是參數中有三個的,最后一個是default,而參數只有兩個的則要求你一定要配置,否則的話則報錯。當然在這么多參數中肯定是有一些經驗參數的,至于這些參數怎么配置我確實沒有一個特別的推薦,需要在不斷的測試中才能磨合出來。
當然你也可以將配置寫在程序里,然后通過程序去啟動broker,這樣kafka的配置就可以像下面一樣寫:
?| 1 2 3 | Properties props = new Properties(); props.setProperty("port","9093"); props.setProperty("log.dir","/home/kafka/data1"); | 
我倒是覺得配置還是直接寫在配置文件中比較好,如果需要修改也不會影響正在運行的服務,寫在內存中,總是會有些不方便的地方。所以還是建議大家都寫配置好了,后面講到的producer和consumer都一樣。
這里再提兩個參數一個是brokerid,每個broker的id必須要區分;第二個參數是hostname,這個是broker和producer、consumer聯系的關鍵,這里記住一定要改成你的地址和端口,否則永遠連得都是localhost。
?
--------------------------------------------------------下一篇將寫producer和consumer的配置了,涉及到這部分就要開始編程了,寫著寫著又往源碼里看進去了,下篇會先講如何搭建開發環境,然后再寫兩個簡單那的例子去熟悉配置。
轉載于:https://www.cnblogs.com/happyday56/p/4210934.html
總結
以上是生活随笔為你收集整理的Kafka broker配置介绍 (四)的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 优化js
- 下一篇: (王道408考研数据结构)第五章树-第三
