KAFKA 同步和异步消息的发送(开发实战)
生活随笔
收集整理的這篇文章主要介紹了
KAFKA 同步和异步消息的发送(开发实战)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 一、消費者監聽
- 1. 啟動zk
- 2. 啟動kafka
- 3. 創建主題
- 4. 消費者監聽消息
- 二、生產者工程
- 2.1. 依賴
- 2.2. 生產者代碼(同步)
- 2.3. 生產者代碼(異步)
- 2.4. 發送消息
- 2.5. 消費者監聽消息
- 2.6. 結果返回
一、消費者監聽
1. 啟動zk
zkServer.sh start# 監聽運行狀態 zkServer.sh status2. 啟動kafka
# 后臺啟動kafka kafka-server-start.sh -daemon /app/kafka_2.12-2.8.0/config/server.properties3. 創建主題
# 創建一個主題名稱為topic_1 該主題分區1個分區 ,該分區有1個副本 kafka-topics.sh --zookeeper localhost:2181/mykafka --create --topic topic_1 --partitions 1 --replication-factor 14. 消費者監聽消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1二、生產者工程
2.1. 依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.1</version></dependency>2.2. 生產者代碼(同步)
package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<String, Object>();//指定初始化連接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.102:9092");//指定key序列化類configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化類configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//構造生產者對象 指定發送的key和value的類型 配置的參數列表(必填參數+輔助參數)KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);//用于設置用戶自定義的消息頭字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));//構造record封裝發送消息主體ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", //指定發送主題0,//指定發送分區0,//指定發送key"hello gblfy 0",//指定發送消息主題headers//用于設置用戶自定義的消息頭字段);//消息的同步確認 調用send方法發送消息final Future<RecordMetadata> future = producer.send(record);//調用get方法接收消息final RecordMetadata metadata = future.get();System.out.println("消息的主題:" + metadata.topic());System.out.println("消息的分區:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());//關閉生產者producer.close();} }2.3. 生產者代碼(異步)
package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<String, Object>();//指定初始化連接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.102:9092");//指定key序列化類configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化類configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//構造生產者對象 指定發送的key和value的類型 配置的參數列表(必填參數+輔助參數)KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);//用于設置用戶自定義的消息頭字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));//構造record封裝發送消息主體ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", //指定發送主題0,//指定發送分區0,//指定發送key"hello gblfy 0",//指定發送消息主題headers//用于設置用戶自定義的消息頭字段);//消息的同步確認 調用send方法發送消息final Future<RecordMetadata> future = producer.send(record);//調用get方法接收消息// final RecordMetadata metadata = future.get();// System.out.println("消息的主題:" + metadata.topic());// System.out.println("消息的分區:" + metadata.partition());// System.out.println("消息的偏移量:" + metadata.offset());//消息的異步確認producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息的主題:" + metadata.topic());System.out.println("消息的分區:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());} else {System.out.println("異常消息");}}});//關閉生產者producer.close();} }2.4. 發送消息
消息有同步發送和異步發送二種
2.5. 消費者監聽消息
2.6. 結果返回
總結
以上是生活随笔為你收集整理的KAFKA 同步和异步消息的发送(开发实战)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Syntax Error: TypeEr
- 下一篇: uniapp 创建与配置 tabbar