java实现Kafka生产者示例
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                java实现Kafka生产者示例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                使用java實現Kafka的生產者
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 | package com.lisg.kafkatest;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.Partitioner;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;/**?* Kafka生產者?* @author lisg?*?*/public class KafkaProducer {????public static void main(String[] args) {?????????????????Properties props = new Properties();????????//根據這個配置獲取metadata,不必是kafka集群上的所有broker,但最好至少有兩個????????props.put("metadata.broker.list", "vm1:9092,vm2:9092");????????//消息傳遞到broker時的序列化方式????????props.put("serializer.class", StringEncoder.class.getName());????????//zk集群????????props.put("zookeeper.connect", "vm1:2181");????????//是否獲取反饋????????//0是不獲取反饋(消息有可能傳輸失敗)????????//1是獲取消息傳遞給leader后反饋(其他副本有可能接受消息失敗)????????//-1是所有in-sync replicas接受到消息時的反饋????????props.put("request.required.acks", "1");//????? props.put("partitioner.class", MyPartition.class.getName());?????????????????//創建Kafka的生產者, key是消息的key的類型, value是消息的類型????????Producer<Integer, String> producer = new Producer<Integer, String>(????????????????new ProducerConfig(props));?????????????????int count = 0;????????while(true) {????????????String message = "message-" + ++count;????????????//消息主題是test????????????KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", message);????????????//message可以帶key, 根據key來將消息分配到指定區, 如果沒有key則隨機分配到某個區//????????? KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);????????????producer.send(keyedMessage);????????????System.out.println("send: " + message);????????????try {????????????????Thread.sleep(1000);????????????} catch (InterruptedException e) {????????????????e.printStackTrace();????????????}????????}?????????//????? producer.close();????} }/**?* 自定義分區類?*?*/class MyPartition implements Partitioner {????public int partition(Object key, int numPartitions) {????????return key.hashCode()%numPartitions;????}?????} | 
來自為知筆記(Wiz)
附件列表
?
轉載于:https://www.cnblogs.com/lishouguang/p/4560559.html
總結
以上是生活随笔為你收集整理的java实现Kafka生产者示例的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: JavaEE 要懂的小事:二、图解 Co
- 下一篇: jquery正则匹配URL地址
