kafka java编程demo_Kafka简单客户端编程实例
今天,我們給大家帶來一篇如何利用Kafka的API進行客戶端編程的文章,這篇文章很簡單,就是利用Kafka的API創建一個生產者和消費者,生產者不斷向Kafka寫入消息,消費者則不斷消費Kafka的消息。下面是具體的實例代碼。
一、創建配置類Config
這個類很簡單,只是存放了兩個常量,一個是話題TOPIC,一個是線程數THREADS
package com.lya.kafka;
/**
* 配置項
* @author liuyazhuang
*
*/
public class Config {
/**
* 話題
*/
public static final String TOPIC = "wordcount";
/**
* 線程數
*/
public static final Integer THREADS = 1;
}
二、編程生產者類ProducerDemo
這個類的主要作用就是向Kafka寫入相應的消息,并且將消息寫入wordcount話題。
package com.lya.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* 生產者實例
* @author liuyazhuang
*
*/
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "192.168.209.121:2181");
props.put("metadata.broker.list","192.168.209.121:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
// 發送業務消息
// 讀取文件 讀取內存數據庫 讀socket端口
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
producer.send(new KeyedMessage(Config.TOPIC,
"this number ===>>> " + i));
}
}
}
三、編寫消息者類ConsumerDemo
這個類的主要作用就是消費Kafka中wordcount話題的消息。
package com.lya.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* 消費者實例
* @author liuyazhuang
*
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.209.121:2181");
props.put("group.id", "1111");
props.put("auto.offset.reset", "smallest");
props.put("zk.connectiontimeout.ms", "15000");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
Map topicCountMap = new HashMap();
topicCountMap.put(Config.TOPIC, Config.THREADS);
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(Config.TOPIC);
for(final KafkaStream kafkaStream : streams){
new Thread(new Runnable() {
@Override
public void run() {
for(MessageAndMetadata mm : kafkaStream){
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
}
}
}
四、運行實例
首先,運行消費者類ConsumerDemo
運行結果如下:
沒有打印任何信息。
此時,我們運行生產者類ProducerDemo
我們再次打開消費者的控制臺查看如下:
打印出了生產者生產的消息。
至此,Kafka簡單客戶端編程實例結束。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
總結
以上是生活随笔為你收集整理的kafka java编程demo_Kafka简单客户端编程实例的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: java简单系统_Java简单学生管理系
- 下一篇: 如何在 iPhone 上退出引导式访问
