java作为kafka生产者实验及Expiring超时问题解决
生活随笔
收集整理的這篇文章主要介紹了
java作为kafka生产者实验及Expiring超时问题解决
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
【README】 java作為生產(chǎn)者,centos 作為消費(fèi)者;
【1】生產(chǎn)者代碼?
-- pom.xml <!-- 依賴 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>生產(chǎn)者
-- 生產(chǎn)者 public class MyProducer {public static void main(String[] args) {/* 1.創(chuàng)建kafka生產(chǎn)者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack應(yīng)答級(jí)別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數(shù)*/props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次發(fā)送多少數(shù)據(jù),當(dāng)數(shù)據(jù)大于16k,生產(chǎn)者會(huì)發(fā)送數(shù)據(jù)到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待時(shí)間, 等待時(shí)間超過1毫秒,即便數(shù)據(jù)沒有大于16k, 也會(huì)寫數(shù)據(jù)到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 緩沖區(qū)大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化類 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.創(chuàng)建生產(chǎn)者對(duì)象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.發(fā)送數(shù)據(jù) */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first01", "first01-20201229--A" + i));System.out.printf("寫入數(shù)據(jù)%s \n", "first01-20201229--A" + i);try {System.out.println(future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.關(guān)閉資源 */ producer.close();System.out.println("kafka生產(chǎn)者寫入數(shù)據(jù)完成"); } }【2】centos 消費(fèi)者?
[root@centos202 kafka-0.11]# kafka-console-consumer.sh --topic first01 --bootstrap-server centos201:9092 --from-beginning first01-20201229--2 first01-20201229--6 first01-20201229--A0 first01-20201229--A1 first01-20201229--A2 first01-20201229--A3 first01-20201229--A4 first01-20201229--A5 first01-20201229--A6 first01-20201229--A7 first01-20201229--A8 first01-20201229--A9【3】生產(chǎn)者發(fā)送消息超時(shí)問題
3.1、問題現(xiàn)場(chǎng)
kafka Expiring 1 record(s) for first01-3: 31539 ms has passed since batch creation plus linger time3.2、解決方法
修改本地機(jī)器的hosts, 如下:
192.168.163.201 centos201 192.168.163.202 centos202 192.168.163.203 centos203?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的java作为kafka生产者实验及Expiring超时问题解决的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cgx是什么意思网络用语 cgx是什么意
- 下一篇: 王珂个人资料及 关于王珂的简介