java客户端作为kafka消费者测试
【README】
本文主要對(duì) java客戶端作為kafka 消費(fèi)者進(jìn)行測(cè)試, 生產(chǎn)者由 kafka客戶端扮演;?
?
【1】普通消費(fèi)者
設(shè)置消費(fèi)者組;
重置消費(fèi)者的offset, 即每次都從最頭開(kāi)始消費(fèi)(默認(rèn)僅保持7天內(nèi)數(shù)據(jù)) ;
類似于 命令行 --from-beginning
kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning
小結(jié):從頭開(kāi)始消費(fèi),必須滿足2個(gè)條件;
條件1: 必須重新?lián)Q組, 如本文中的消費(fèi)者組 從 sichuan 更新為 sichuan1 ; 條件2: 需要設(shè)置offset, 修改為 earliest, 默認(rèn)值是 lastest; /*** 普通消費(fèi)者*/ public class MyConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2開(kāi)啟自動(dòng)提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] " + rd.key() + "--" + rd.value()); }} /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }?從官網(wǎng)可以找到以上配置值;?https://kafka.apache.org/0110/documentation.html#configuration
?
【2】kafka消費(fèi)者-手動(dòng)提交offset?
手動(dòng)提交offset有3種方式:
- 方式1:同步手動(dòng)提交;
- 方式2:異步手動(dòng)提交;?
- 方式3:自定義手動(dòng)提交策略;
0)為啥需要手動(dòng)提交?
kafka自動(dòng)提交是在kafka拉取到數(shù)據(jù)之后就直接提交,這樣很容易丟失數(shù)據(jù),尤其是在需要事物控制的時(shí)候。 很多情況下我們需要從kafka成功拉取數(shù)據(jù)之后,對(duì)數(shù)據(jù)進(jìn)行相應(yīng)的處理之后再進(jìn)行提交。如拉取數(shù)據(jù)之后進(jìn)行寫入mysql這種 , 所以這時(shí)我們就需要進(jìn)行手動(dòng)提交kafka的offset下標(biāo)。這里順便說(shuō)下offset具體是什么。 offset:指的是kafka的topic中的每個(gè)消費(fèi)組消費(fèi)的下標(biāo)。 簡(jiǎn)單的來(lái)說(shuō)就是一條消息對(duì)應(yīng)一個(gè)offset下標(biāo),每次消費(fèi)數(shù)據(jù)的時(shí)候如果提交offset,那么下次消費(fèi)就會(huì)從提交的offset加一那里開(kāi)始消費(fèi)。 比如一個(gè)topic中有100條數(shù)據(jù),我消費(fèi)了50條并且提交了,那么此時(shí)的kafka服務(wù)端記錄提交的offset就是49(offset從0開(kāi)始),那么下次消費(fèi)的時(shí)候offset就從50開(kāi)始消費(fèi)。1)關(guān)閉自動(dòng)提交(默認(rèn)為true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);第一次啟動(dòng) consumer 從 90 開(kāi)始消費(fèi);
第2次啟動(dòng)相同 consumer ,還是從90開(kāi)始消費(fèi);
2) 如何使用手動(dòng)提交?
kafka提供了手動(dòng)提交offset的api; 方法1:commitSync 同步提交: ; 方法2:commitAsync 異步提交; 兩者相同點(diǎn):都會(huì)將本次 poll 的一批數(shù)據(jù)最高的偏移量提交; 不同點(diǎn)是, commitSync 阻塞當(dāng)前線程,一直到提交成功, 并且會(huì)自動(dòng)失敗重試; 而 commitAsync 沒(méi)有失敗重試機(jī)制, 可能提交失敗;3)同步手動(dòng)提交offset
/*** 手動(dòng)同步提交offset */ public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關(guān)閉自動(dòng)提交(默認(rèn)為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,當(dāng)前線程會(huì)阻塞直到 offset提交成功 */ consumer.commitSync();} /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }4)異步手動(dòng)提交offset?
/*** 異步手動(dòng)提交offset */ public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關(guān)閉自動(dòng)提交(默認(rèn)為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【異步提交】 當(dāng)前線程會(huì)阻塞直到 offset提交成功 */ consumer.commitAsync(new OffsetCommitCallback() {@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception !=null) {System.out.println("異步提交失敗");} else {System.out.println("異步提交成功"); }}}); } /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }5)自定義手動(dòng)提交offset策略
5.0)為啥需要自定義?
因?yàn)楫惒教峤挥幸恍﹩?wèn)題,如下: 先消費(fèi)數(shù)據(jù),后提交offset, 可能導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi); 先提交offset, 后走業(yè)務(wù)邏輯,可能會(huì)丟數(shù)據(jù);5.1)應(yīng)用場(chǎng)景:
把 offset 存儲(chǔ)到本地庫(kù) 和 消息消費(fèi)邏輯 在同一個(gè)數(shù)據(jù)庫(kù)事務(wù)里面;
5.2)如何實(shí)現(xiàn)?需要實(shí)現(xiàn) ConsumerRebalanceListener 來(lái)實(shí)現(xiàn)。
/*** 自定義手動(dòng)提交offset策略 */ public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關(guān)閉自動(dòng)提交(默認(rèn)為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】調(diào)用}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】調(diào)用 /* 分區(qū)分配方法 */for (TopicPartition partition : partitions) { /*定位到某個(gè) offset*/consumer.seek(partition, 1); // TODO: 這里需要輸入1 }} });/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,當(dāng)前線程會(huì)阻塞直到 offset提交成功 */ consumer.commitSync();} /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }補(bǔ)充: 消費(fèi)者rebalance 是什么?
消費(fèi)者 rebalance, 什么時(shí)候觸發(fā) rebalance? 如 同一個(gè)消費(fèi)者組下的 某個(gè)消費(fèi)者機(jī)器宕機(jī),或新增一個(gè)消費(fèi)者機(jī)器,都會(huì)觸發(fā) rebalance,即重新分配 kafka分區(qū)數(shù)據(jù)與 消費(fèi)者的對(duì)應(yīng)關(guān)系;?
?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的java客户端作为kafka消费者测试的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java客户端作为kafka生产者测试
- 下一篇: 洁白的近义词是什么 一起来看看吧