kafka生产者开发方式
【README】
本文記錄了 kafka生產者開發方式;
【1】生產者概覽
【1.1】kafka發送消息過程
【1.2】創建kafka生產者
1)創建kafka生產者, 有3個必選屬性:
【2】發送消息到kafka
1)發送消息有3種方式:
【2.1】同步發送
/*** @Description 同步發送生產者* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/ public class MyProducerSync {public static void main(String[] args) {// 1.創建kafka生產者的配置信息Properties props = new Properties();// 指定連接的kafka集群, broker-listprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");// key, value 的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2.創建生產者對象KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.發送數據Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10","k1", "v1"));try {// 當前線程阻塞,直到kafka響應返回寫入消息的元數據RecordMetadata respMetadata = future.get();System.out.println("[生產者寫入消息] 分區【" + respMetadata.partition() + "】-offset【" + respMetadata.offset() + "】");} catch (Exception e) {}// 關閉生產者producer.close();System.out.println("kafka生產者寫入數據完成");} }kafka生產者一般發生兩類錯誤:
【2.2】異步發送消息 (帶回調函數)
/*** @Description 【異步】發送生產者* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/ public class MyProducerAsync {public static void main(String[] args) {// 1.創建kafka生產者的配置信息Properties props = new Properties();// 指定連接的kafka集群, broker-listprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");// key, value 的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2.創建生產者對象KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.發送數據producer.send(new ProducerRecord<String, String>("hello10","k1", "v1"), new MyProducerCallback());// 關閉生產者producer.close();System.out.println("kafka生產者寫入數據完成");}/*** @Description 生產者發送消息后回調類* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/private static class MyProducerCallback implements Callback {// kafka服務器響應時回調方法@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("[生產者寫入消息成功] 分區【" + metadata.partition() + "】-offset【" + metadata.offset() + "】");} else {System.out.printf("寫入kafka失敗,異常信息【%s】", exception);}}} }【2.3】生產者配置
1)acks: 有3個可選項;表示生產者消息被認為寫入成功時,需要寫入的副本個數;
- 0:無需判斷,只要把消息發送出去,就認為成功;
- 1:僅首領副本;
- all:所有副本;
2)buffer.memory: 設置生產者內存緩沖區大小,用于緩沖發送到服務器的消息;
- 若緩沖區不足,send() 方法要么阻塞,要么拋出異常;取決于如何設置? max.block.ms 參數(拋出異常前可以阻塞一段時間);
3)compression.type: 壓縮算法;
4)retries: 發送消息失敗時,生產者可以重試的次數;
- 如果達到這個次數,生產者會放棄重試并返回錯誤; 默認情況下,生產者會在每次重試之間等待 100ms,通過 retry.backoff.ms? 參數來改變這個時間間隔;
- 一般情況下,沒必須處理可重試錯誤。但需要處理不可重試錯誤或重試次數超過上限的情況;
5)batch.size:? 生產者把多個消息放在同一個批次里;該參數指定了一個批次可以使用的內存大小,單位字節;不過生產者不一定等到批次被填滿才發送(參考 linger.ms);
6)linger.ms:指定生產者在發送批次前等待更多消息加入批次的時間;
- 生產者會在批次填滿或linger.ms 達到上限時把批次發送出去;
- 建議把linger.ms 設置為大于0的數,雖然增加了延時但提高了吞吐量;
7)client.id : 任意字符串,服務器用它識別消息來源,還可以用在 日志和配額指標里;
8)max.in.flight.requests.per.connection : 指定生產者在收到服務器響應前可以發送多少個消息;
- 把它設置為1,可以保證消息是按照順序寫入服務器的,即使發生了重試;
9)timeout.ms? , request.timeout.ms 和 metadata.fetch.timeout.ms
10)max.block.ms : send() 方法或使用 partitionFor() 獲取元數據時生產者的阻塞時間;
- 當生產者發送緩沖區已滿,或沒有可用的元數據,這些方法就會阻塞;在阻塞時間達到 該值時,生產者拋出超時異常;
11)max.request.size: 指定生產者發送的請求大小;
- 可以指單個消息的最大值,也可以指單個請求所有消息總大小(如一批多個消息但走了一個請求);
- 注意: broker對可接受的消息最大值有自己的限制(通過 message.max.bytes) 指定;?
12)receive.buffer.bytes 和 send.buffer.bytes
- 分別指定 TCP socket接收和發送數據包的緩沖區大小; 如果設置為-1,使用操作系統默認值;
【2.4】生產者常用配置代碼示例
public class MyProducer {public static void main(String[] args) {/* 1.創建kafka生產者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*3.ack應答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次發送多少數據,當數據大于16k,生產者會發送數據到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待時間, 等待時間超過1毫秒,即便數據沒有大于16k, 也會寫數據到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超時時間props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 緩沖區大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化類 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 設置壓縮算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");System.out.println(props);/* 9.創建生產者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.發送數據 */String now = DateUtils.getNowTimestamp();int order = 1;for (int i = 0; i < 50000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10",j, "", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生產者] 分區【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}}/* 11.關閉資源 */producer.close();System.out.println("kafka生產者寫入數據完成");} }【3】分區
1)使用消息的鍵來做hash,以hash值作為分區號;
2)如果鍵為null,則使用默認分區器;默認使用 輪詢(Round Robin)算法把消息均衡分布到各個分區上;?
【3.1】實現自定義分區策略
/*** @Description 自定義分區策略的生產者* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/ public class MyProducerWithPartition {public static void main(String[] args) {// 1.創建kafka生產者的配置信息Properties props = new Properties();// 指定連接的kafka集群, broker-listprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");// key, value 的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置分區器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());// 2.創建生產者對象KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.發送數據Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10","31", "v1"));try {// 當前線程阻塞,直到kafka響應返回寫入消息的元數據RecordMetadata respMetadata = future.get();System.out.println("[生產者寫入消息] 分區【" + respMetadata.partition() + "】-offset【" + respMetadata.offset() + "】");} catch (Exception e) {}// 關閉生產者producer.close();System.out.println("kafka生產者寫入數據完成");} }分區器?
/*** @Description 分區器* @author xiao tang* @version 1.0.0* @createTime 2021年12月09日*/ public class MyPartitioner implements Partitioner {// 對鍵首位字符ascii取分區數的模獲得分區號@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int partitionSize = cluster.partitionCountForTopic(topic);int operand = 0;if (key != null && String.valueOf(key).length() > 0) {operand = String.valueOf(key).codePointAt(0);}return operand % partitionSize;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }【4】攔截器
定義攔截器,設置攔截器屬性(可配置多個攔截器);
/** 設置攔截器 */ props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName())); /*** @Description 時間攔截器* @author xiao tang* @version 1.0.0* @createTime 2021年12月10日*/ public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在消息被序列化以及計算分區前調用, 追加時間戳(偷梁換柱)return new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value() + "[TimeInterceptor]" + DateUtils.getNowTimestamp());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 在消息從 RecordAccumulator 成功發送到Kafka Broker之后,或者在發送過程中失敗時調用// 寫入數據庫}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }消費消息日志:
消費者-分區【0】offset【7774】 -> 2021-12-10 21:05:32--[1]? > ABCDE[TimeInterceptor]2021-12-10 21:05:30
消費者-分區【1】offset【7644】 -> 2021-12-10 21:05:32--[2]? > ABCDE[TimeInterceptor]2021-12-10 21:05:32
消費者-分區【2】offset【7626】 -> 2021-12-10 21:05:32--[3]? > ABCDE[TimeInterceptor]2021-12-10 21:05:32
總結
以上是生活随笔為你收集整理的kafka生产者开发方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 淘宝app怎么扫描(淘宝app怎么扫描二
- 下一篇: ps2安卓模拟器(ps2安卓)