Kafka的producer案例,Kafka的consumer案例
生活随笔
收集整理的這篇文章主要介紹了
Kafka的producer案例,Kafka的consumer案例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、編寫所需的pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.com.kafka</groupId><artifactId>learnKafka</artifactId><version>1.0-SNAPSHOT</version><!--導入maven依賴有兩種:一種maven.org 一種從已有的項目中導入--><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion></exclusions></dependency></dependencies> </project>2、編寫一個Producer
代碼:
package kafka;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;import java.util.Properties;/*** 代碼說明** @author tuzq* @create 2017-06-18 16:50*/ public class MyKafkaProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put("metadata.broker.list","hadoop1:9092");properties.put("serializer.class","kafka.serializer.StringEncoder");Producer producer = new Producer(new ProducerConfig(properties));//先到Linux下創建topic:test//[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 1 --topic testproducer.send(new KeyedMessage("test","I am tuzq"));} }運行程序
準備條件:
1、創建test的topic
[root@hadoop1 kafka]# bin/kafka-topics.sh –create –zookeeper hadoop11:2181 –replication-factor 1 -partitions 1 –topic test
2、執行kafka-console-consumer.sh這個腳本,監聽發過來的消息
[root@hadoop3 kafka]# sh bin/kafka-console-consumer.sh –zookeeper hadoop11:2181 –from-beginning –topic test
3、右鍵Run這個類
3.編寫含有自定義的一些條件的Producer
代碼
package kafka;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;import java.util.Properties;/*** 這是一個簡單的Kafka producer代碼* 包含兩個功能:* 1、數據發送* 2、數據按照自定義的partition策略進行發送** @author tuzq* @create 2017-06-18 17:13*/ public class KafkaProducerSimple {public static void main(String[] args) {/*** 1、指定當前Kafka producer生產的數據的目的地* 創建topic可以輸入以下命令,在kafka集群的任一節點進行創建* bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic test*/String TOPIC = "test";/*** 2、讀取配置文件*/Properties props = new Properties();/*** key.serializer.class默認為serializer.class*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** kafka broker對應的主機,格式為host1:port1,host2:port2*/props.put("metadata.broker.list","hadoop1:9092");/** request.required.acks,設置發送數據是否需要服務端的反饋,有三個值0,1,-1* 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。* 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。** 1,意味著在leader replica已經接收到數據后,producer會得到一個ack。* 這個選項提供了更好的持久性,因為在server確認請求成功處理后,client才會返回。* 如果剛寫到leader上,還沒來得及復制leader就掛了,那么消息才可能會丟失。** -1,意味著在所有的ISR都接收到數據后,producer才得到一個ack。* 這個選項提供了最好的持久性,只要還有一個replica存活,那么數據就不會丟失*/props.put("request.required.acks", "1");/** 可選配置,如果不配置,則使用默認的partitioner partitioner.class* 默認值:kafka.producer.DefaultPartitioner* 用來把消息分到各個partition中,默認行為是對key進行hash。*///props.put("partitioner.class","kafka.MyPartitioner");//默認情況下是:props.put("partitioner.class","kafka.producer.DefaultPartitioner");/*** 3.通過配置文件,創建生產者*/Producer<String,String> producer = new Producer<String, String>(new ProducerConfig(props));/*** 4.通過for循環生產數據*/int messageNo = 0;while (true){String messageStr = new String(" aaaa");/*** 5、調用producer的send方法發送數據* 注意:這里需要指定 partitionKey,用來配合自定義的MyPartitioner進行數據分發*/producer.send(new KeyedMessage<String, String>(TOPIC,messageNo+"",messageStr));messageNo += 1;}} }運行程序
進入/home/tuzq/software/kafka/servers/logs/kafka,執行以下命令:
只要這個程序一直開著,那么發現du -h這個值就在不停的變大。
4.Kafka Consumer案例
代碼:
package kafka; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class KafkaConsumerSimple implements Runnable {public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {//獲取自己的消費編號,以及要消費的kafkaStreamthis.title = title;this.stream = stream;}public void run() {System.out.println("開始運行 " + title);//6、從KafkaStream獲取一個迭代器ConsumerIterator<byte[], byte[]> it = stream.iterator();/*** 7、不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞* */while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.err.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{// 1、準備一些配置參數Properties props = new Properties();props.put("group.id", "testGroup");props.put("zookeeper.connect", "hadoop11:2181,hadoop12:2181,hadoop13:2181");props.put("auto.offset.reset", "largest");props.put("auto.commit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);//2、準備要消費的topicString topic = "test";//3、創建一個consumer的連接器// 只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//創建topicCountMapMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic,1);//4、獲取每個topic對應的kafkaStreamMap<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//5、消費KafkaStream中的數據List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);ExecutorService executor = Executors.newFixedThreadPool(4);for (int i = 0; i < streams.size(); i++)executor.execute(new KafkaConsumerSimple("consumer" + (i + 1), streams.get(i)));} }右鍵運行
注意:上滿的三個小案例在運行之前,都要先檢查以下kafka是否已經啟動了,檢查方式是執行jps命令,看看是否有kafka的進程
總結
以上是生活随笔為你收集整理的Kafka的producer案例,Kafka的consumer案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 零元开网店是不是真的 创业原来这么简单
- 下一篇: 信用卡积分可以干什么