java 集成kafka单机版 适配jdk1.8
生活随笔
收集整理的這篇文章主要介紹了
java 集成kafka单机版 适配jdk1.8
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 一、環境分布
- 1. 版本聲明
- 2. 依賴
- 2. case測試
- 2. case2測試
一、環境分布
1. 版本聲明
| jdk | 1.8 |
| kafka | kafka_2.13-2.4.0 |
注:建議版本和應用依賴的客戶端版本依賴保持一致,如果需要更高版本,可以嘗試
但是有一點,小伙伴們要記住:linux服務器的kafka版本向下兼容,但是,kafka的客戶端版本不向下兼容,這一點很重要!
2. 依賴
<!-- kafka連接 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>2.4.0</version></dependency>2. case測試
package com.sinosoft.a;import kafka.consumer.ConsumerConfig; import kafka.producer.ProducerConfig;import java.util.Arrays; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;public class KafkaDemo {public static void main(String[] args) throws InterruptedException {// 生產者示例providerDemo();// 消費者示例consumerDemo();}/*** 生產者示例*/public static void providerDemo() {Properties properties = new Properties();/*** kafka的服務地址*/properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.5.6.19:9092"); // properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.5.6.19:9092,192.168.17.137:9092");/*** 在考慮完成請求之前,生產者要求leader收到的確認數量。這可以控制發送記錄的持久性。允許以下設置:* acks = 0,生產者將不會等待來自服務器的任何確認。該記錄將立即添加到套接字緩沖區并視為已發送。在這種情況下,無法保證服務器已收到記錄,并且retries配置將不會生效(因為客戶端通常不會知道任何故障)。* acks = 1,這意味著leader會將記錄寫入其本地日志,但無需等待所有follower的完全確認即可做出回應。在這種情況下,如果leader在確認記錄后立即失敗但在關注者復制之前,則記錄將丟失。* acks = all,這意味著leader將等待完整的同步副本集以確認記錄。這保證了只要至少一個同步副本仍然存活,記錄就不會丟失。這是最強有力的保證。這相當于acks = -1設置*/properties.put(ProducerConfig.ACKS_CONFIG, "all");/*** 當從broker接收到的是臨時可恢復的異常時,生產者會向broker重發消息,但是不能無限制重發,如果重發次數達到限制值,生產者將不會重試并返回錯誤。* 通過retries屬性設置。默認情況下生產者會在重試后等待100ms,可以通過 retries.backoff.ms屬性進行修改*/properties.put(ProducerConfig.RETRIES_CONFIG, 0);/*** 當有多條消息要被發送到同一分區時,生產者會把他們放到同一批里。kafka通過批次的概念來 提高吞吐量,但是也會在增加延遲。* 以下配置,當緩存數量達到16kb,就會觸發網絡請求,發送消息*/properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/*** 每條消息在緩存中的最長時間(單位ms),如果超過這個時間就會忽略batch.size的限制,由客戶端立即將消息發送出去*/properties.put(ProducerConfig.LINGER_MS_CONFIG, 50);/*** Kafka的客戶端發送數據到服務器,不是來一條就發一條,而是經過緩沖的,也就是說,通過KafkaProducer發送出去的消息都是先進入到客戶端本地的內存緩沖里,然后把很多消息收集成一個一個的Batch,再發送到Broker上去的,這樣性能才可能高。* buffer.memory的本質就是用來約束KafkaProducer能夠使用的內存緩沖的大小的,默認值32MB。* 如果buffer.memory設置的太小,可能導致的問題是:消息快速的寫入內存緩沖里,但Sender線程來不及把Request發送到Kafka服務器,會造成內存緩沖很快就被寫滿。而一旦被寫滿,就會阻塞用戶線程,不讓繼續往Kafka寫消息了。* 所以“buffer.memory”參數需要結合實際業務情況壓測,需要測算在生產環境中用戶線程會以每秒多少消息的頻率來寫入內存緩沖。經過壓測,調試出來一個合理值。*/properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);/*** key的序列化方式*/properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");/*** value序列化方式*/properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties); // for (int i = 0; i < 100; i++) { // String msg = "------Message " + i; // producer.send(new ProducerRecord<String, String>("mytest", msg)); // System.out.println("Sent:" + msg); // }String msg = "------Message hello world!";// mytest 為topicproducer.send(new ProducerRecord<String, String>("mytest", msg));System.out.println("Sent:" + msg);} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}/*** 消費者示例* @throws InterruptedException*/public static void consumerDemo() throws InterruptedException {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.5.6.19:9092"); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.136:9092,192.168.17.137:9092");// 每個消費者分配獨立的組號props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");// 如果value合法,則自動提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 設置自動更新被消費消息的偏移量的時間間隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");// 設置會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");// 設置服務返回的最大數據量,這不是絕對最大值,如果提取的第一個非空分區中的第一條消息大于此值,則仍將返回該消息以確保使用者使用。此處設置5MBprops.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "5242880");// 設置服務返回的每個分區的最大數據量,此大小必須至少與服務器允許的最大消息大小(fetch.max.bytes)一樣大,否則,生產者有可能發送大于消費者可以獲取的消息。此處設置5MBprops.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5242880");/*** earliest,當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費* latest,當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據* none,topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常*/props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// mytest 為topicconsumer.subscribe(Arrays.asList("mytest"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records){System.out.printf("------------------offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());System.out.println();}/*** 手動提交偏移量* 保證同一個consumer group中,下一次讀取(不論進行了rebalance)時,既不會重復消費消息,也不會遺漏消息。* 防止consumer莫名掛掉后,下次進行數據fetch時,不能從上次讀到的數據開始讀而導致Consumer消費的數據丟失*/consumer.commitSync();Thread.sleep(2000);}}}2. case2測試
package com.sinosoft.b;import java.util.Arrays; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;public class kafkaConsumerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "10.5.6.19:9092"); // 指向kafka集群的IP地址properties.put("group.id", " group-1"); // Consumer分組IDproperties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000"); /* 自動確認offset的時間間隔 */properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("max.poll.records", "100");// max.poll.records條數據需要在在session.timeout.ms這個時間內處理完properties.put("fetch.min.bytes", "1");//server發送到消費端的最小數據,若是不滿足這個數值則會等待直到滿足指定大小。默認為1表示立即接收。properties.put("fetch.wait.max.ms", "1000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Arrays.asList("xuhaitao")); // 設置消費的主題while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // 調用poll方法來輪循Kafka集群的消息,其中參數100是超時時間for (ConsumerRecord<String, String> record : records) {System.out.printf("offsetConsumer = %d, value = %s", record.offset(), record.value());System.out.println();}}} } package com.sinosoft.b;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "10.5.6.19:9092");// 指向kafka集群的IP地址properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 100; i++) {String msg = "This is Message " + i;producer.send(new ProducerRecord<String, String>("xuhaitao", msg));System.out.println("Sent:" + msg);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}} }總結
以上是生活随笔為你收集整理的java 集成kafka单机版 适配jdk1.8的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Oracle 表空间常用sql
- 下一篇: Sonarqube+maven 分析代码