kafka环境安装及简单使用(单机版)
一個分布式發布-訂閱消息傳遞系統
特點:
??? 高吞吐量、低延遲
使用場景(舉例):
??? 日志收集:用kafka收集各種服務產生的log,通過kafka以統一的接口服務的方式開放給各種consumer,如hadoop,hbase等
?
下載安裝:
??? 1.下載地址? ? 選擇一個版本的kafka進行下載
??? 2.解壓
tar -zxvf kafka_2.11-0.9.0.1.tgz mv kafka_2.11-0.9.0.1 /opt/??? 3.配置環境變量(可選步驟)
?
上手使用:
??? 1.config目錄配置文件(zookeeper.properties,service.properties,producer.properties,consumer.properties)
我們暫時先不管這些配置文件,遵守初始的配置
??? 2.先啟動zookeeper - kafka依賴與zookeeper 實現分布式一致性
我們下載的kafka安裝包,就自帶了zookeeepr,zookeeper.properties就是自帶的zk的配置文件
nohup bin/zookeeper-server-start.sh config/zookeeepr.properties& nohup &是實現在后臺啟動???
??? 3.再啟動kafka服務
bin/kafka-server-start.sh config/server.properties???
??? 4.創建一個Topic
bin/kafka-topics.sh --create --topic test1 --zookeeper localehost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1?
??? 4.再啟動kafka生產端
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1?
??? 5.在新窗口再啟動kafka消費端
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning?
??? 6.在生產窗口輸入任意字符,觀察在消費端是否能夠收到相應字符
如果無法收到正確字符,或者報錯,嘗試從以下方面排查:
??? 1.服務是否都按順序正常啟動
??? 2.命令中開啟的服務端口是否和相應的配置文件中的配置對應
??????? 注:生產端訪問的端口不是? zookeeper的localhost:2181, 而是producer.properties中配置的broker的端口,默認為9092
??????? 注:這個broker的端口是需要在 server中有相應的配置才可以
?
簡單介紹一下上面提到了config目錄下面的配置,以及kafka集群的搭建
server.properties:一個server.properties文件代表了一個kafka服務,也就是一個Broker
所以說,如果我們想搭建一個kafka集群,需要有不同的 server.properties文件,來啟動多個broker,多個borker組成kafka cluster
??? 注:每個server.properties配置文件中的 broker.id(服務器唯一標識)不能一樣
???????? port(服務器監聽端口號)不能一樣
???????? zookeeper.connect(zookeeper的連接ip及端口),需和zookeeper.properties保持一致
?
kafka在Java程序的簡單示例:
? 生產:
public class JavaKafkaProducer {private Logger logger = Logger.getLogger(JavaKafkaProducer.class);public static final String TOPIC_NAME = "test1";public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();public static final int chartsLength = charts.length;public static void main(String[] args) {String brokerList = "127.0.0.1:9092";Properties props = new Properties();props.put("metadata.broker.list", brokerList);/*** 0表示不等待結果返回<br/>* 1表示等待至少有一個服務器返回數據接收標識<br/>* -1表示必須接收到所有的服務器返回標識,及同步寫入<br/>* */props.put("request.required.acks", "0");/*** 內部發送數據是異步還是同步* sync:同步, 默認* async:異步*/props.put("producer.type", "async");/*** 設置序列化的類* 可選:kafka.serializer.StringEncoder* 默認:kafka.serializer.DefaultEncoder*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** 設置分區類* 根據key進行數據分區* 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區* 可選:kafka.serializer.ByteArrayPartitioner ==> 轉換為字節數組后進行hash分區*/props.put("partitioner.class", "com.kafka.JavaKafkaProducerPartitioner");// 重試次數props.put("message.send.max.retries", "3");// 異步提交的時候(async),并發提交的記錄數props.put("batch.num.messages", "200");// 設置緩沖區大小,默認10KBprops.put("send.buffer.bytes", "102400");// 2. 構建Kafka Producer Configuration上下文ProducerConfig config = new ProducerConfig(props);// 3. 構建Producer對象final Producer<String, String> producer = new Producer<String, String>(config);// 4. 發送數據到服務器,并發線程發送final AtomicBoolean flag = new AtomicBoolean(true);int numThreads = 50;ExecutorService pool = Executors.newFixedThreadPool(numThreads);for (int i = 0; i < 5; i++) {pool.submit(new Thread(new Runnable() {@Overridepublic void run() {while (flag.get()) {// 發送數據KeyedMessage message = generateKeyedMessage();producer.send(message);System.out.println("發送數據:" + message);// 休眠一下try {int least = 10;int bound = 100;Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + " shutdown....");}}, "Thread-" + i));}// 5. 等待執行完成long sleepMillis = 600000;try {Thread.sleep(sleepMillis);} catch (InterruptedException e) {e.printStackTrace();}flag.set(false);// 6. 關閉資源 pool.shutdown();try {pool.awaitTermination(6, TimeUnit.SECONDS);} catch (InterruptedException e) {} finally {producer.close(); // 最后之后調用 }}/*** 產生一個消息** @return*/private static KeyedMessage<String, String> generateKeyedMessage() {String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);StringBuilder sb = new StringBuilder();int num = ThreadLocalRandom.current().nextInt(1, 5);for (int i = 0; i < num; i++) {sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");}String message = sb.toString().trim();return new KeyedMessage(TOPIC_NAME, key, message);}/*** 產生一個給定長度的字符串** @param numItems* @return*/private static String generateStringMessage(int numItems) {StringBuilder sb = new StringBuilder();for (int i = 0; i < numItems; i++) {sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);}return sb.toString();} }?
? 消費:
public class JavaKafkaConsumerHighAPITest {public static void main(String[] args) {String zookeeper = "127.0.0.1";String groupId = "test-consumer-group";String topic = "test1";int threads = 1;JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);new Thread(example).start();// 執行10秒后結束int sleepMillis = 600000;try {Thread.sleep(sleepMillis);} catch (InterruptedException e) {e.printStackTrace();}// 關閉 example.shutdown();} }?
?
kafka各組件說明:
??? 1.Broker -- 每個kafka server稱為一個Broker,多個borker組成kafka cluster。
??? 2.Topic? --? Topic 就是消息類別名,一個topic中通常放置一類消息。每個topic都有一個或者多個訂閱者,也就是消息的消費者consumer。
??????? Producer將消息推送到topic,由訂閱該topic的consumer從topic中拉取消息。
??????? 一個Broker上可以創建一個或者多個Topic。同一個topic可以在同一集群下的多個Broker中分布。
??? ....
?
?
參考博文:http://www.cnblogs.com/liuming1992/tag/Kafka/
?
轉載于:https://www.cnblogs.com/xuzekun/p/8986540.html
總結
以上是生活随笔為你收集整理的kafka环境安装及简单使用(单机版)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android studio下将项目代码
- 下一篇: Opensetack + Kuberne