producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探
生活随笔
收集整理的這篇文章主要介紹了
producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
本次內(nèi)容我們有兩個目標(biāo):第一個初探Producer發(fā)送消息的流程第二個我們學(xué)習(xí)一下Kafka是如何構(gòu)造異常體系的一、代碼分析Producer核心流程初探//因?yàn)樯a(chǎn)中開發(fā)使用的是異步的方式發(fā)送的消息,所以我這兒直接貼的代碼//就是異步發(fā)送的代碼,大家注意這個代碼里面?zhèn)鬟M(jìn)去了兩個參數(shù)//一個是消息//一個是回調(diào)函數(shù),這個回調(diào)函數(shù)很重要,每個消息發(fā)送完成以后這個回調(diào)函數(shù)都會被//執(zhí)行,我們可以根據(jù)這個回調(diào)函數(shù)返回來的信息知道消息是否發(fā)送成功,//做相對應(yīng)的應(yīng)對處理。這種傳遞回調(diào)函數(shù)的代碼設(shè)計(jì)方式也值得我們積累,這樣可以增加用戶開發(fā)代碼時候的靈活性。
?producer.send(new?ProducerRecord<>(topic,
????????????????????messageNo,
????????????????????messageStr),?new?DemoCallBack(startTime,?messageNo,?messageStr));點(diǎn)擊過去就會看到如下核心代碼private?Future?doSend(ProducerRecord?record,?Callback?callback)?{
????????TopicPartition?tp?=?null;try?{//?first?make?sure?the?metadata?for?the?topic?is?available//第一步:阻塞等待獲取集群元數(shù)據(jù)//maxBlockTimeMs?獲取元數(shù)據(jù)最多等待的時間
????????????ClusterAndWaitTime?clusterAndWaitTime?=?waitOnMetadata(record.topic(),?record.partition(),?maxBlockTimeMs);//最多等待的時間減去等待元數(shù)據(jù)花的時間等于還可以在等待的時間
????????????long?remainingWaitMs?=?Math.max(0,?maxBlockTimeMs?-?clusterAndWaitTime.waitedOnMetadataMs);//集群元數(shù)據(jù)信息
????????????Cluster?cluster?=?clusterAndWaitTime.cluster;//第二步:對key和value進(jìn)行序列化
????????????byte[]?serializedKey;try?{
????????????????serializedKey?=?keySerializer.serialize(record.topic(),?record.key());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?key?of?class?"?+?record.key().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?key.serializer");
????????????}
????????????byte[]?serializedValue;try?{
????????????????serializedValue?=?valueSerializer.serialize(record.topic(),?record.value());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?value?of?class?"?+?record.value().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?value.serializer");
????????????}//第三步:根據(jù)分區(qū)器選擇合適的分區(qū)
????????????int?partition?=?partition(record,?serializedKey,?serializedValue,?cluster);//第四步:計(jì)算消息的大小
????????????int?serializedSize?=?Records.LOG_OVERHEAD?+?Record.recordSize(serializedKey,?serializedValue);//確認(rèn)消息是否超出限制
????????????ensureValidRecordSize(serializedSize);//第五步:根據(jù)元數(shù)據(jù)獲取到topic信息,封裝分區(qū)對象
????????????tp?=?new?TopicPartition(record.topic(),?partition);
????????????long?timestamp?=?record.timestamp()?==?null???time.milliseconds()?:?record.timestamp();
????????????log.trace("Sending?record?{}?with?callback?{}?to?topic?{}?partition?{}",?record,?callback,?record.topic(),?partition);//?producer?callback?will?make?sure?to?call?both?'callback'?and?interceptor?callback//第六步:設(shè)置回調(diào)對象
????????????Callback?interceptCallback?=?this.interceptors?==?null???callback?:?new?InterceptorCallback<>(callback,?this.interceptors,?tp);//第七步:把消息追加到accumulator對象中
????????????RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,?serializedValue,?interceptCallback,?remainingWaitMs);//消息存入accumulator中,如果一個批次滿了,或者是創(chuàng)建了一個新的批次//那么喚醒sender線程,讓sender線程開始干活,至于干什么活,我們后面//再去分析if?(result.batchIsFull?||?result.newBatchCreated)?{
????????????????log.trace("Waking?up?the?sender?since?topic?{}?partition?{}?is?either?full?or?getting?a?new?batch",?record.topic(),?partition);//第八步:喚醒sender線程this.sender.wakeup();
????????????}return?result.future;//?handling?exceptions?and?record?the?errors;//?for?API?exceptions?return?them?in?the?future,//?for?other?exceptions?throw?directly
????????}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}
????}看完上面的代碼,我們也就大概完成了本次的第一個目標(biāo):初探Producer的核心流程初探。代碼調(diào)用的時序圖如下:Producer發(fā)送數(shù)據(jù)流程分析二、Kafka異常體系一直跟著分析源碼的同學(xué)能感覺得到上面的代碼就是KafkaProducer的核心流程。這也是我們?yōu)槭裁丛谔暨@個時候講Kafka是如何構(gòu)造異常體系的原因,一般在項(xiàng)目的核心流程里面去觀察這個項(xiàng)目的異常體系會看得比較清晰,大家發(fā)現(xiàn)這個流程里面捕獲了很多異常:?}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}通過看這段代碼,我們可以學(xué)習(xí)到如下3個點(diǎn):1. 核心流程捕獲各種異常(上面的這段代碼就是核心代碼)2. 底層異常直接往上拋,比如:ensureValidRecordSize方法3. 自定義各種異常,力求出了問題,方便精準(zhǔn)定位問題?比如:ensureValidRecordSize方法注意:核心流程捕獲異常的時候我們也可以考慮把異常封裝成為各種狀態(tài)碼。Kafka自定義各種異常。舉個例子,比如我們分析初探核心流程里面有段代碼是://檢查要發(fā)送的這個消息大小,?檢查是否超過了請求大小和內(nèi)存緩沖大小。
????????????ensureValidRecordSize(serializedSize);點(diǎn)擊過去private?void?ensureValidRecordSize(int?size)?{//默認(rèn)值1M,如果超過1M拋異常if?(size?>?this.maxRequestSize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?maximum?request?size?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.MAX_REQUEST_SIZE_CONFIG?+"?configuration.");//不能超過內(nèi)存緩沖的大小,如果超過內(nèi)存大小拋異常if?(size?>?this.totalMemorySize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?total?memory?buffer?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.BUFFER_MEMORY_CONFIG?+"?configuration.");
????}RecordTooLargeException 就是自定義的異常,Kafka選擇把這種底層代碼的異常往上拋,在核心流程里統(tǒng)一處理。如果沒有太多工業(yè)項(xiàng)目設(shè)計(jì)經(jīng)驗(yàn)的同學(xué),可以學(xué)習(xí)Kafka的異常體系的設(shè)計(jì),Kafka使用的這種異常處理方式是大多數(shù)大數(shù)據(jù)項(xiàng)目處理異常使用的方式。三、總結(jié)本小節(jié)主要分析了KafkaProducer發(fā)送消息的大致步驟,另外此小節(jié)還有一個重點(diǎn)就是我們學(xué)習(xí)了Kafka是如何構(gòu)建自己的異常體系的。系列更新第八期啦,后續(xù)還有更精彩的內(nèi)容!喜歡的同學(xué)可以點(diǎn)贊,關(guān)注-? ?關(guān)注“大數(shù)據(jù)觀止”? ?- 《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀
?producer.send(new?ProducerRecord<>(topic,
????????????????????messageNo,
????????????????????messageStr),?new?DemoCallBack(startTime,?messageNo,?messageStr));點(diǎn)擊過去就會看到如下核心代碼private?Future?doSend(ProducerRecord?record,?Callback?callback)?{
????????TopicPartition?tp?=?null;try?{//?first?make?sure?the?metadata?for?the?topic?is?available//第一步:阻塞等待獲取集群元數(shù)據(jù)//maxBlockTimeMs?獲取元數(shù)據(jù)最多等待的時間
????????????ClusterAndWaitTime?clusterAndWaitTime?=?waitOnMetadata(record.topic(),?record.partition(),?maxBlockTimeMs);//最多等待的時間減去等待元數(shù)據(jù)花的時間等于還可以在等待的時間
????????????long?remainingWaitMs?=?Math.max(0,?maxBlockTimeMs?-?clusterAndWaitTime.waitedOnMetadataMs);//集群元數(shù)據(jù)信息
????????????Cluster?cluster?=?clusterAndWaitTime.cluster;//第二步:對key和value進(jìn)行序列化
????????????byte[]?serializedKey;try?{
????????????????serializedKey?=?keySerializer.serialize(record.topic(),?record.key());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?key?of?class?"?+?record.key().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?key.serializer");
????????????}
????????????byte[]?serializedValue;try?{
????????????????serializedValue?=?valueSerializer.serialize(record.topic(),?record.value());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?value?of?class?"?+?record.value().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?value.serializer");
????????????}//第三步:根據(jù)分區(qū)器選擇合適的分區(qū)
????????????int?partition?=?partition(record,?serializedKey,?serializedValue,?cluster);//第四步:計(jì)算消息的大小
????????????int?serializedSize?=?Records.LOG_OVERHEAD?+?Record.recordSize(serializedKey,?serializedValue);//確認(rèn)消息是否超出限制
????????????ensureValidRecordSize(serializedSize);//第五步:根據(jù)元數(shù)據(jù)獲取到topic信息,封裝分區(qū)對象
????????????tp?=?new?TopicPartition(record.topic(),?partition);
????????????long?timestamp?=?record.timestamp()?==?null???time.milliseconds()?:?record.timestamp();
????????????log.trace("Sending?record?{}?with?callback?{}?to?topic?{}?partition?{}",?record,?callback,?record.topic(),?partition);//?producer?callback?will?make?sure?to?call?both?'callback'?and?interceptor?callback//第六步:設(shè)置回調(diào)對象
????????????Callback?interceptCallback?=?this.interceptors?==?null???callback?:?new?InterceptorCallback<>(callback,?this.interceptors,?tp);//第七步:把消息追加到accumulator對象中
????????????RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,?serializedValue,?interceptCallback,?remainingWaitMs);//消息存入accumulator中,如果一個批次滿了,或者是創(chuàng)建了一個新的批次//那么喚醒sender線程,讓sender線程開始干活,至于干什么活,我們后面//再去分析if?(result.batchIsFull?||?result.newBatchCreated)?{
????????????????log.trace("Waking?up?the?sender?since?topic?{}?partition?{}?is?either?full?or?getting?a?new?batch",?record.topic(),?partition);//第八步:喚醒sender線程this.sender.wakeup();
????????????}return?result.future;//?handling?exceptions?and?record?the?errors;//?for?API?exceptions?return?them?in?the?future,//?for?other?exceptions?throw?directly
????????}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}
????}看完上面的代碼,我們也就大概完成了本次的第一個目標(biāo):初探Producer的核心流程初探。代碼調(diào)用的時序圖如下:Producer發(fā)送數(shù)據(jù)流程分析二、Kafka異常體系一直跟著分析源碼的同學(xué)能感覺得到上面的代碼就是KafkaProducer的核心流程。這也是我們?yōu)槭裁丛谔暨@個時候講Kafka是如何構(gòu)造異常體系的原因,一般在項(xiàng)目的核心流程里面去觀察這個項(xiàng)目的異常體系會看得比較清晰,大家發(fā)現(xiàn)這個流程里面捕獲了很多異常:?}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}通過看這段代碼,我們可以學(xué)習(xí)到如下3個點(diǎn):1. 核心流程捕獲各種異常(上面的這段代碼就是核心代碼)2. 底層異常直接往上拋,比如:ensureValidRecordSize方法3. 自定義各種異常,力求出了問題,方便精準(zhǔn)定位問題?比如:ensureValidRecordSize方法注意:核心流程捕獲異常的時候我們也可以考慮把異常封裝成為各種狀態(tài)碼。Kafka自定義各種異常。舉個例子,比如我們分析初探核心流程里面有段代碼是://檢查要發(fā)送的這個消息大小,?檢查是否超過了請求大小和內(nèi)存緩沖大小。
????????????ensureValidRecordSize(serializedSize);點(diǎn)擊過去private?void?ensureValidRecordSize(int?size)?{//默認(rèn)值1M,如果超過1M拋異常if?(size?>?this.maxRequestSize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?maximum?request?size?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.MAX_REQUEST_SIZE_CONFIG?+"?configuration.");//不能超過內(nèi)存緩沖的大小,如果超過內(nèi)存大小拋異常if?(size?>?this.totalMemorySize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?total?memory?buffer?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.BUFFER_MEMORY_CONFIG?+"?configuration.");
????}RecordTooLargeException 就是自定義的異常,Kafka選擇把這種底層代碼的異常往上拋,在核心流程里統(tǒng)一處理。如果沒有太多工業(yè)項(xiàng)目設(shè)計(jì)經(jīng)驗(yàn)的同學(xué),可以學(xué)習(xí)Kafka的異常體系的設(shè)計(jì),Kafka使用的這種異常處理方式是大多數(shù)大數(shù)據(jù)項(xiàng)目處理異常使用的方式。三、總結(jié)本小節(jié)主要分析了KafkaProducer發(fā)送消息的大致步驟,另外此小節(jié)還有一個重點(diǎn)就是我們學(xué)習(xí)了Kafka是如何構(gòu)建自己的異常體系的。系列更新第八期啦,后續(xù)還有更精彩的內(nèi)容!喜歡的同學(xué)可以點(diǎn)贊,關(guān)注-? ?關(guān)注“大數(shù)據(jù)觀止”? ?- 《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀
總結(jié)
以上是生活随笔為你收集整理的producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: stm32 adc过采样_产生ADC误差
- 下一篇: android dialog 隐藏状态栏