linux环境安装 kafka 0.8.2.1 jdk1.6
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                linux环境安装 kafka 0.8.2.1  jdk1.6
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                文章目錄
- 一、環境分布
 - 二、實戰
 - 1. kafka下載
 - 2. 解壓
 - 3. 配置
 - 4. 編寫啟動腳本
 - 5. 編寫關閉腳本
 - 6. 賦予腳本可執行權限
 - 7. 腳本使用案例
 
- 三、Config配置
 - 四、Consumer配置
 - 五、Producer配置
 
很多小伙伴問我,為什么不用最新版本的kafka呢?關于這個問題,都是基于項目和場景來說的,給小伙伴舉幾個例子哈!
以前的項目用的jdk1.6適配的kafka版本是0.8.2.1
現在一個新的項目,技術也支持,jdk1.8建議選擇一個新的并且穩定的kafka版本
一、環境分布
| jdk | 1.8 | 
| kafka | kafka_2.9.2-0.8.2.1.tgz | 
二、實戰
kafka官網地址:
 http://kafka.apache.org/downloads
 
1. kafka下載
wget https://archive.apache.org/dist/kafka/0.8.2.1/kafka_2.9.2-0.8.2.1.tgz2. 解壓
tar –zxvf kafka_2.9.2-0.8.2.1.tgz3. 配置
cd kafka_2.9.2-0.8.2.1/config/ vi server.properties 添加一下內容: broker.id=0 #端口號 port=9092 #服務器IP地址,修改為自己的服務器IP host.name=172.30.0.9 #日志存放路徑,上面創建的目錄 log.dirs=/usr/local/logs/kafka #zookeeper地址和端口,單機配置部署,localhost:2181 zookeeper.connect=localhost:21814. 編寫啟動腳本
vi kafkastart.sh 添加以下內容: #啟動zookeeper /app/kafka_2.9.2-0.8.2.1/bin/zookeeper-server-start.sh /app/kafka_2.9.2-0.8.2.1/config/zookeeper.properties & #等3秒后執行 sleep 3 #啟動kafka /app/kafka_2.9.2-0.8.2.1/bin/kafka-server-start.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &5. 編寫關閉腳本
vi kafkastop.sh 添加以下內容: #關閉zookeeper /app/kafka_2.9.2-0.8.2.1/bin/zookeeper-server-stop.sh /app/kafka_2.9.2-0.8.2.1/config/zookeeper.properties & #等3秒后執行 sleep 3 #關閉kafka /app/kafka_2.9.2-0.8.2.1/bin/kafka-server-stop.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &6. 賦予腳本可執行權限
chmod 777 kafkastart.sh kafkastop.sh 或者 chmod +x kafkastart.sh kafkastop.sh7. 腳本使用案例
#啟動腳本 ./kafkastart.sh #關閉腳本 ./kafkastop.sh記得開啟防火墻9092和2181端口
注:此單機版使用的是kafka自帶的zookeeper
三、Config配置
下表列出了Boker的重要的配置參數, 更多的配置請參考 kafka.server.KafkaConfig
| brokerid | 0 | 每一個boker都有一個唯一的id作為它們的名字。 這就允許boker切換到別的主機/端口上, consumer依然知道 | 
| enable.zookeeper | true | 允許注冊到zookeeper | 
| log.flush.interval.messages | Long.MaxValue | 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量 | 
| log.flush.interval.ms | Long.MaxValue | 在數據被寫入到硬盤前的最大時間 | 
| log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數據是否要寫入到硬盤的時間間隔。 | 
| log.retention.hours | 168 | 控制一個log保留多長個小時 | 
| log.retention.bytes | -1 | 控制log文件最大尺寸 | 
| log.cleaner.enable | false | 是否log cleaning | 
| log.cleanup.policy | delete | delete還是compat. 其它控制參數還包括log.cleaner.threads,log.cleaner.io.max.bytes.per.second,log.cleaner.dedupe.buffer.size,log.cleaner.io.buffer.size,log.cleaner.io.buffer.load.factor,log.cleaner.backoff.ms,log.cleaner.min.cleanable.ratio,log.cleaner.delete.retention.ms | 
| log.dir | /tmp/kafka-logs | 指定log文件的根目錄 | 
| log.segment.bytes | 1024*1024 | 單一的log segment文件大小 | 
| log.roll.hours | 24 * 7 | 開始一個新的log文件片段的最大時間 | 
| message.max.bytes | 1000000 + MessageSet.LogOverhead | 一個socket 請求的最大字節數 | 
| num.network.threads | 3 | 處理網絡請求的線程數 | 
| num.io.threads | 8 | 處理IO的線程數 | 
| background.threads | 10 | 后臺線程序 | 
| num.partitions | 1 | 默認分區數 | 
| socket.send.buffer.bytes | 102400 | socket SO_SNDBUFF參數 | 
| socket.receive.buffer.bytes | 102400 | socket SO_RCVBUFF參數 | 
| zookeeper.connect | localhost:2182/kafka | 指定zookeeper連接字符串, 格式如hostname:port/chroot。chroot是一個namespace | 
| zookeeper.connection.timeout.ms | 6000 | 指定客戶端連接zookeeper的最大超時時間 | 
| zookeeper.session.timeout.ms | 6000 | 連接zk的session超時時間 | 
| zookeeper.sync.time.ms | 2000 | zk follower落后于zk leader的最長時間 | 
四、Consumer配置
下表列出了high-level consumer的重要的配置參數。更多的配置請參考 kafka.consumer.ConsumerConfig
| groupid | groupid | 一個字符串用來指示一組consumer所在的組 | 
| socket.timeout.ms | 30000 | socket超時時間 | 
| socket.buffersize | 64*1024 | socket receive buffer | 
| fetch.size | 300 * 1024 | 控制在一個請求中獲取的消息的字節數。 這個參數在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代 | 
| backoff.increment.ms | 1000 | 這個參數避免在沒有新數據的情況下重復頻繁的拉數據。 如果拉到空數據,則多推后這個時間 | 
| queued.max.message.chunks | 2 | high level consumer內部緩存拉回來的消息到一個隊列中。 這個值控制這個隊列的大小 | 
| autocommit.enable | true | 如果true,consumer定期地往zookeeper寫入每個分區的offset | 
| auto.commit.interval.ms | 10000 | 往zookeeper上寫offset的頻率 | 
| auto.offset.reset | smallnest | 如果offset出了返回,則 smallest : 自動設置reset到最小的offset. largest : 自動設置offset到最大的offset. anything else : 否則拋出異常. | 
| consumer.timeout.ms | -1 | 默認-1,consumer在沒有新消息時無限期的block。如果設置一個正值, 一個超時異常會拋出 | 
| rebalance.retries.max | 4 | rebalance時的最大嘗試次數 | 
五、Producer配置
下表列出了producer的重要的參數。更多的配置請參考 kafka.producer.ProducerConfig
| serializer.class | kafka.serializer.DefaultEncoder | 必須實現kafka.serializer.Encoder 接口,將T類型的對象encode成kafka message | 
| key.serializer.class | serializer.class | key對象的serializer類 | 
| partitioner.class | kafka.producer.DefaultPartitioner | 必須實現kafka.producer.Partitioner ,根據Key提供一個分區策略 | 
| producer.type | sync | 指定消息發送是同步還是異步。異步asyc成批發送用kafka.producer.AyncProducer, 同步sync用kafka.producer.SyncProducer | 
| metadata.broker.list | boker list | 使用這個參數傳入boker和分區的靜態信息,如host1:port1,host2:port2, 這個可以是全部boker的一部分 | 
| compression.codec | NoCompressionCodec | 消息壓縮,默認不壓縮 | 
| compressed.topics | null | 在設置了壓縮的情況下,可以指定特定的topic壓縮,為指定則全部壓縮 | 
| message.send.max.retries | 3 | 消息發送最大嘗試次數 | 
| retry.backoff.ms | 300 | 每次嘗試增加的額外的間隔時間 | 
| topic.metadata.refresh.interval.ms | 600000 | 定期的獲取元數據的時間。當分區丟失,leader不可用時producer也會主動獲取元數據,如果為0,則每次發送完消息就獲取元數據,不推薦。如果為負值,則只有在失敗的情況下獲取元數據。 | 
| queue.buffering.max.ms | 5000 | 在producer queue的緩存的數據最大時間,僅僅for asyc | 
| queue.buffering.max.message | 10000 | producer 緩存的消息的最大數量,僅僅for asyc | 
| queue.enqueue.timeout.ms | -1 | 0當queue滿時丟掉,負值是queue滿時block,正值是queue滿時block相應的時間,僅僅for asyc | 
| batch.num.messages | 200 | 一批消息的數量,僅僅for asyc | 
總結
以上是生活随笔為你收集整理的linux环境安装 kafka 0.8.2.1 jdk1.6的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Springboot整合Quartz集群
 - 下一篇: Ant-Design-Vue 安装