如何创建Kafka客户端:Avro Producer和Consumer Client
1.目標 - Kafka客戶端
在本文的Kafka客戶端中,我們將學習如何使用Kafka API?創建Apache Kafka客戶端。有幾種方法可以創建Kafka客戶端,例如最多一次,至少一次,以及一次性消息處理需求。因此,在這個Kafka客戶端教程中,我們將學習所有三種方式的詳細描述。此外,我們將詳細介紹如何使用Avro客戶端。
那么,讓我們開始Kafka客戶端教程。
如何創建Kafka客戶端:Avro Producer和Consumer Client
2. Kafka客戶是什么?
- 創建Kafka客戶端的先決條件
學習Apache Kafka用例|?Kafka應用程序
此外,在Kafka客戶端中創建一個名為normal-topic的主題,其中包含兩個分區,命令為:
?
此外,執行以下命令,以檢查創建的主題的狀態:
bin/kafka-topics --list --topic normal-topic --zookeeper localhost:2181?
此外,要在需要更改主題時增加分區,請執行以下命令:
bin/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2?
3.卡夫卡制片人客戶
這里是以下代碼來實現Kafka生產者客戶端。它將有助于發送文本消息并調整循環以控制需要發送以創建Kafka客戶端的消息數量:
public class ProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, String> producer = createProducer();sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20);}private static Producer<String, String> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size", 10);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer(props);}private static void sendMessages(Producer<String, String> producer) {String topic = "normal-topic";int partition = 0;long record = 1;for (int i = 1; i <= 10; i++) {producer.send(new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));}} }?
4.消費者可以注冊Kafka
首先,讓我們學習幾種方法,Kafka消費者客戶可以通過這種方式向Kafka經紀人注冊。具體來說,有兩種方法,使用subscribe方法調用或使用assign方法調用。讓我們詳細了解這兩種Kafka客戶端方法。
一個。使用訂閱方法調用
使用訂閱方法調用時,Kafka會在添加/刪除主題或分區時,或者在添加或刪除使用者時自動重新平衡可用的使用者。
灣?使用分配方法調用。
但是,當消費者使用assign方法調用注冊時,Kafka客戶端不提供消費者的自動重新平衡。
讓我們修改Kafka架構及其基本概念
上述任何一種注冊選項都可以被最多一次,至少一次或完全一次的消費者使用。
一世。最多一次卡夫卡消費者(零次或多次交付)
基本上,這是卡夫卡消費者的默認行為。
要在Kafka客戶端中配置此類型的使用者,請按照下列步驟操作:
- 首先,將'enable.auto.commit'設置為true。
- 另外,將'auto.commit.interval.ms'設置為較低的時間范圍。
- 確保不要調用consumer.commitSync();?來自消費者。此外,Kafka將使用此消費者配置以指定的時間間隔自動提交偏移量。
然而,消費者有可能表現出最多一次或至少一次的行為,而消費者則以這種方式配置。雖然,讓我們將此消費者聲明為最多一次,因為最多一次是較低的消息傳遞保證。讓我們詳細討論兩種消費者行為:
- 最多一次的情景
發生提交間隔的時刻,以及觸發Kafka自動提交上次使用的偏移的時刻,這種情況發生。但是,讓我們假設消息和消費者在處理之間崩潰了。然后,當消費者重新啟動時,它開始從最后提交的偏移量接收消息。同時,消費者可能會丟失一些消息。
探索卡夫卡的優勢與劣勢
- 至少一次的情況
當消費者處理消息并將消息提交到其持久存儲中時,消費者在此時崩潰,這種情況發生。但是,讓我們假設Kafka沒有機會向代理提交偏移,因為提交間隔還沒有通過。然后,當消費者重新啟動時,它會從最后一個提交的偏移量中獲得一些較舊的消息。
卡夫卡消費者代碼:
?
II。至少一次Kafka Consumer(一個或多個消息傳遞,可能重復)
為了配置此類型的使用者,請按照下列步驟操作:
- 首先,將'enable.auto.commit'設置為false或
- 另外,將'enable.auto.commit'設置為true,將'auto.commit.interval.ms'設置為更高的數字。
通過調用consumer.commitSync(),Consumer現在應該控制消息偏移提交給Kafka;?
此外,為了避免重復消息的重新處理,在消費者中實現“冪等”行為,尤其是對于這種類型的消費者,因為在以下場景中,可能發生重復的消息傳遞。
我們來討論Apache Kafka Security |?Kafka代碼的需求和組成部分
:
?
III。通過訂閱(一個且只有一個消息傳遞)完全一次Kafka動態消費者
這里,通過'subscribe'(1,a)注冊方法調用,消費者向Kafka注冊。
確保在這種情況下應手動管理偏移量。要在Kafka客戶端中設置完全一次的方案,請按照下列步驟操作:
- 首先,設置enable.auto.commit = false。
- 處理完消息后,請勿調用consumer.commitSync()。
- 此外,通過進行“訂閱”調用,將消費者注冊到主題。
- 要從該主題/分區的特定偏移量開始讀取,請實現ConsumerRebalanceListener。此外,在偵聽器中執行consumer.seek(topicPartition,offset)。
- 作為安全網,實施冪等。
碼:
public class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer)while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}} } public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));}} } /** * The partition offset are stored in an external storage. In this case in a local file system where * program runs. */ public class OffsetManager {private String storagePrefix;public OffsetMpublic class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer()// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}} } public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));}} } /** * The partition offset are stored in an external storage. In this case in a local file system where * program runs. */ public class OffsetManager {private String storagePrefix;public OffsetManager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** in an external storage, overwrite the offset for the topic.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;} } anager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** in an external storage, overwrite the offset for the topic.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;} }?
看看Storm Kafka與配置和代碼的集成
iv。完全一次Kafka靜態消費者通過分配(一次和一次消息傳遞)
這里,通過'assign(2)注冊方法調用,消費者向Kafka客戶注冊。
確保在這種情況下應手動管理偏移量。要通過Assign設置Exactly-once Kafka Static Consumer,請按照下列步驟操作:
- 首先,設置enable.auto.commit = false
- 請記住,在處理完消息后,請不要調用consumer.commitSync()。
- 此外,通過使用'assign'調用,將consumer注冊到特定分區。
- 通過調用consumer.seek(topicPartition,offset),在消費者啟動時尋找特定的消息偏移量。
- 另外,作為安全網,實施冪等。
碼:
public class ExactlyOnceStaticConsumer {private static OffsetManager offsetManager = new OffsetManager("storage1");public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ExactlyOnceStaticConsumer ...");readMessages();}private static void readMessages() throws InterruptedException, IOException {KafkaConsumer<String, String> consumer = createConsumer();String topic = "normal-topic";int partition =1;TopicPartition topicPartition =registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.long offset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition. consumer.seek(topicPartition, offset);processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg2";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}/*** Manually listens for specific topic partition. Now, see an example of how to * dynamically listens to partition and want to manually control offset,* ExactlyOnceDynamicConsumer.java*/private static TopicPartition registerConsumerToSpecificPartition(KafkaConsumer<String, String> consumer, String topic, int partition) {TopicPartition topicPartition = new TopicPartition(topic, partition);List<TopicPartition> partitions = Arrays.asList(topicPartition);consumer.assign(partitions);return topicPartition;}/*** Process data and store offset in external store. Best practice is to do these operations* atomically.*/private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}} }?
5. Avro制片人和消費者
在定義Avro時,它是一種開源二進制消息交換協議。基本上,為了通過線路發送優化的消息,這也減少了網絡開銷,我們使用它。此外,對于可以使用JSON定義的消息,Avro可以強制執行模式。通過使用這些模式,Avro可以使用各種編程語言生成綁定對象。將Avro與Kafka一起使用是本機支持的,也是強烈推薦的。
閱讀Apache Kafka + Spark Streaming Integration
下面是一個簡單的Avro消費者和制作人。
?
所以,這完全是關于Kafka客戶端的。希望您喜歡我們對如何創建Kafka客戶端的解釋。
六,結論
因此,我們已經看到了使用Kafka API創建Kafka客戶端的所有方法。此外,在這個Kafka Clients教程中,我們討論了Kafka Producer Client,Kafka Consumer Client。除此之外,我們還了解了Avro Kafka Producer和Consumer Kafka客戶。但是,如果對Kafka客戶有任何疑問,請隨時通過評論部分詢問。?
轉載于:https://www.cnblogs.com/a00ium/p/10852433.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的如何创建Kafka客户端:Avro Producer和Consumer Client的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式锁优化
- 下一篇: Linux下V4L2捕捉画面+H264压