spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍
【README】
0,為啥要看? DefaultKafkaProducerFactory? 最近在基于 springboot 開(kāi)發(fā)kafka模塊,發(fā)現(xiàn) kafakTemplate構(gòu)造器傳入了 DefaultKafkaProducerFactory實(shí)例, kafkaTemplate內(nèi)部使用了 很多 DefaultKafkaProducerFactory的方法; 所以把 DefaultKafkaProducerFactory的重點(diǎn)方法分析處理出來(lái),以便于查看 KafkaTemplate的內(nèi)部邏輯;?
1, 本文涉及的 kafka操作,不涉及事務(wù)和消費(fèi)者,所以本文忽略了有關(guān)kafka事務(wù),消費(fèi)者的描述; kafka事務(wù), refer2? 轉(zhuǎn):Kafka事務(wù)使用和編程示例/實(shí)例_PacosonSWJTU的博客-CSDN博客Kafka事務(wù)使用和編程示例/實(shí)例_JobShow裁員加班實(shí)況-微信小程序-CSDN博客一、概述? Kafka事務(wù)特性是指一系列的生產(chǎn)者生產(chǎn)消息和消費(fèi)者提交偏移量的操作在一個(gè)事務(wù)中,或者說(shuō)是一個(gè)原子操作,生產(chǎn)消息和提交偏移量同時(shí)成功或者失敗。注意:kafka事務(wù)和DB事務(wù)。在理解消息的事務(wù)時(shí),一直處于一個(gè)錯(cuò)誤理解是,把操作db的業(yè)務(wù)邏輯跟操作消息當(dāng)成是一個(gè)事務(wù),如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:讀取消息或生產(chǎn)消息 kafkaOperation(); /https://blog.csdn.net/PacosonSWJTU/article/details/1213058842,本文結(jié)合了 api doc 對(duì) DefaultKafkaProducerFactory-默認(rèn)kafka生產(chǎn)者工廠的重點(diǎn)方法進(jìn)行介紹;??
3,DefaultKafkaProducerFactory 類(lèi)代碼結(jié)構(gòu)包括(小結(jié)):
【1】 類(lèi)描述
類(lèi)描述:
單例共享 Producer 實(shí)例的 ProducerFactory 實(shí)現(xiàn)。
此實(shí)現(xiàn)將為每次 createProducer() 調(diào)用時(shí)提供的 Map 配置和可選的 Serializer 實(shí)現(xiàn)返回相同的 Producer 實(shí)例(如果未啟用事務(wù))。
如果您使用的序列化器沒(méi)有參數(shù)構(gòu)造函數(shù)并且不需要設(shè)置,那么最簡(jiǎn)單的方法是在傳遞給 DefaultKafkaProducerFactory 構(gòu)造函數(shù)的配置中針對(duì) ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG 和 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG 鍵指定序列化器類(lèi)。
如果這是不可能的,但您確定以下至少一項(xiàng)是正確的:
- 1 只有一個(gè)生產(chǎn)者會(huì)使用序列化程序。
- 2 您正在使用可以在 Producer 實(shí)例之間共享的序列化程序(特別是它們的 close() 方法是無(wú)操作的)。
- 3 您確定沒(méi)有任何單個(gè) Producer 被關(guān)閉的風(fēng)險(xiǎn),而其他具有相同序列化程序的 Producer 實(shí)例正在使用中。
然后您可以為鍵和值序列化程序之一或兩者傳入 Serializer 實(shí)例。
如果以上都不是真的,那么您可以為一個(gè)或兩個(gè)序列化程序提供一個(gè)供應(yīng)商函數(shù),每次工廠創(chuàng)建生產(chǎn)者時(shí),該函數(shù)將用于獲取序列化程序。
Producer 被包裝,并且在調(diào)用 Producer.close() 時(shí)實(shí)際上并未關(guān)閉底層的 KafkaProducer 實(shí)例。當(dāng)調(diào)用 DisposableBean.destroy() 或應(yīng)用程序上下文發(fā)布 ContextStoppedEvent 時(shí),KafkaProducer 物理關(guān)閉。你也可以調(diào)用reset()。
設(shè)置 setTransactionIdPrefix(String) 啟用事務(wù);在這種情況下,會(huì)維護(hù)生產(chǎn)者的緩存;關(guān)閉生產(chǎn)者將其返回到緩存。當(dāng)工廠被銷(xiāo)毀、應(yīng)用程序上下文停止或調(diào)用 reset() 方法時(shí),生產(chǎn)者將關(guān)閉并清除緩存。
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactoryimplements ProducerFactory<K, V>, ApplicationContextAware,BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {?【2】構(gòu)造器
使用提供的配置和序列化程序供應(yīng)商構(gòu)建工廠。
如果提供,還可以把 transactionIdPrefix 配置為ProducerConfig.TRANSACTIONAL_ID_CONFIG 的值。
此配置將被目標(biāo) Producer 實(shí)例的后綴覆蓋。
public DefaultKafkaProducerFactory(Map<String, Object> configs,@Nullable Supplier<Serializer<K>> keySerializerSupplier,@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {this.configs = new ConcurrentHashMap<>(configs);this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;// clientId 表示kafka生產(chǎn)者idif (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);}// 是否開(kāi)啟事務(wù)String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);if (StringUtils.hasText(txId)) {setTransactionIdPrefix(txId);this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);}this.configs.put("internal.auto.downgrade.txn.commit", true); }【特別注意】我們可以不傳入 key序列化器,value序列化器 對(duì)象到 DefaultKafkaProducerFactory構(gòu)造器(即設(shè)置為null) , 而把序列化器全限定類(lèi)名設(shè)置在 configs 屬性里面,因?yàn)樵鷎afka生產(chǎn)者的構(gòu)造器可以讀取配置中的序列化器類(lèi),如下:
new DefaultKafkaProducerFactory<>(0 (Map) PPKafkaClusterManager.INSTANCE.getKafkaClusterProps(kafkaClusterName));// 不傳入key value的序列化器,默認(rèn)為null?kafka 原生構(gòu)造器如下:
// 通過(guò)反射創(chuàng)建序列化器對(duì)象 public <T> T getConfiguredInstance(String key, Class<T> t) {Class<?> c = getClass(key);if (c == null)return null;Object o = Utils.newInstance(c);if (!t.isInstance(o))throw new KafkaException(c.getName() + " is not an instance of " + t.getName());if (o instanceof Configurable)((Configurable) o).configure(originals());return t.cast(o);}【3】方法介紹
【3.1】設(shè)置物理關(guān)閉生產(chǎn)者超時(shí)時(shí)間
public void setPhysicalCloseTimeout(int physicalCloseTimeout) {this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout); }通過(guò)工廠物理關(guān)閉生產(chǎn)者而不是producer自身關(guān)閉的等待時(shí)間(當(dāng) {@link #reset()}、{@link #destroy() #closeProducerFor(String)} 或 {@link #closeThreadBoundProducer( )} 被調(diào)用)。 以秒為單位; 默認(rèn){@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}。
【3.2】設(shè)置事務(wù)id 前綴(開(kāi)啟事務(wù))
public final void setTransactionIdPrefix(String transactionIdPrefix) {Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null");this.transactionIdPrefix = transactionIdPrefix;enableIdempotentBehaviour(); } // 事務(wù)可用? @Override public boolean transactionCapable() {return this.transactionIdPrefix != null; }為 ProducerConfig.TRANSACTIONAL_ID_CONFIG 配置設(shè)置前綴。 默認(rèn)情況下,來(lái)自配置的 ProducerConfig.TRANSACTIONAL_ID_CONFIG 值用作目標(biāo)生產(chǎn)者配置中的前綴。
【3.3】 設(shè)置是否每個(gè)線程產(chǎn)生一個(gè)生產(chǎn)者
public void setProducerPerThread(boolean producerPerThread) {this.producerPerThread = producerPerThread; }設(shè)置為 true 為每個(gè)線程創(chuàng)建一個(gè)生產(chǎn)者,而不是由所有客戶端共享的單例。 當(dāng)不再需要生產(chǎn)者時(shí),客戶端必須調(diào)用 closeThreadBoundProducer() 來(lái)物理關(guān)閉生產(chǎn)者。 這些生產(chǎn)者不會(huì)被 destroy() 或 reset() 關(guān)閉。?
【3.4】創(chuàng)建kafka生產(chǎn)者方法(重點(diǎn))*
3個(gè)外觀方法
@Override public Producer<K, V> createProducer() {return createProducer(this.transactionIdPrefix); }@Override public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;return doCreateProducer(txIdPrefix); }@Override public Producer<K, V> createNonTransactionalProducer() {return doCreateProducer(null); }底層方法
private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {if (txIdPrefix != null) { // 使用kafka事務(wù)if (this.producerPerConsumerPartition) {return createTransactionalProducerForPartition(txIdPrefix);}else {return createTransactionalProducer(txIdPrefix);}}if (this.producerPerThread) { // 每個(gè)線程一個(gè)生產(chǎn)者 return getOrCreateThreadBoundProducer();}synchronized (this) { // 我們走這里,synchronized同步代碼塊 if (this.producer != null && expire(this.producer)) {this.producer = null;}if (this.producer == null) {this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,this.physicalCloseTimeout, this.beanName, this.epoch.get());this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));}return this.producer;} }【代碼解說(shuō)】 以上方法有4個(gè)分支,包括創(chuàng)建分區(qū)帶事務(wù)的生產(chǎn)者, 帶事務(wù)的生產(chǎn)者,每個(gè)線程一個(gè)生產(chǎn)者,普通生產(chǎn)者;
【3.4.1】 創(chuàng)建分區(qū)帶事務(wù)的生產(chǎn)者 createTransactionalProducerForPartition(txIdPrefix)
因?yàn)榉椒ㄟ^(guò)于復(fù)雜,放在文末說(shuō)明
【3.4.2】 帶事務(wù)的生產(chǎn)者 createTransactionalProducer(txIdPrefix)
因?yàn)榉椒ㄟ^(guò)于復(fù)雜,放在文末說(shuō)明
【3.4.3】 每個(gè)線程一個(gè)生產(chǎn)者 getOrCreateThreadBoundProducer
該方法就是把 生產(chǎn)者放入了線程級(jí)變量 ThreadLocal; 僅此而已
private Producer<K, V> getOrCreateThreadBoundProducer() {// 從線程級(jí)變量獲取CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {closeThreadBoundProducer();tlProducer = null;}if (tlProducer == null) {tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,this.physicalCloseTimeout, this.beanName, this.epoch.get());for (Listener<K, V> listener : this.listeners) {listener.producerAdded(tlProducer.clientId, tlProducer);}this.threadBoundProducers.set(tlProducer); // 放入線程級(jí)遍歷 }return tlProducer; }threadBoundProducers 就是線程級(jí)遍歷
【3.4.4】 普通生產(chǎn)者*(本文所關(guān)注的方法)
synchronized塊中 可以保證所有客戶端線程復(fù)用同一個(gè) kafka生產(chǎn)者,只要這個(gè)kafka沒(méi)有過(guò)期;即便過(guò)期,它會(huì)重新創(chuàng)建一個(gè) kafka生產(chǎn)者;
synchronized中調(diào)用了expire(this.producer)判斷是否過(guò)期
private boolean expire(CloseSafeProducer<K, V> producer) {boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;if (expired) {producer.closeDelegate(this.physicalCloseTimeout, this.listeners);}return expired; }接著 創(chuàng)建了 CloseSafeProducer -關(guān)閉安全的生產(chǎn)者, 這是一個(gè)內(nèi)部類(lèi);
protected static class CloseSafeProducer<K, V> implements Producer<K, V> {在 調(diào)用CloseSafeProducer 構(gòu)造器時(shí)傳入了 kafka生產(chǎn)者, 由 createKafkaProducer() 獲取; createKafkaProducer() 方法如下:
protected Producer<K, V> createKafkaProducer() {Map<String, Object> newConfigs;if (this.clientIdPrefix == null) {// 是否有生產(chǎn)者客戶端id ,這個(gè)值可以為空,newConfigs = new HashMap<>(this.configs);}else {newConfigs = new HashMap<>(this.configs);newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());}checkBootstrap(newConfigs);return createRawProducer(newConfigs); }// 創(chuàng)建原生kafka生產(chǎn)者 protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {Producer<K, V> kafkaProducer =new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());for (ProducerPostProcessor<K, V> pp : this.postProcessors) {kafkaProducer = pp.apply(kafkaProducer);}return kafkaProducer; }?子類(lèi)必須返回一個(gè)原生生產(chǎn)者(kafka生產(chǎn)者),該生產(chǎn)者將被包裝在 DefaultKafkaProducerFactory.CloseSafeProducer 中。
【4】?jī)?nèi)部類(lèi)-保證關(guān)閉安全的kafka生產(chǎn)者-CloseSafeProducer
1, 該類(lèi)實(shí)現(xiàn)了 接口? Producer;
【4.1】構(gòu)造器
上文調(diào)用? CloseSafeProducer構(gòu)造器創(chuàng)建 生產(chǎn)者對(duì)象,
?其中 removeProducer 使用了 java8語(yǔ)法的方法引用,如下:
構(gòu)造器有1個(gè)外觀
CloseSafeProducer(Producer<K, V> delegate,BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout,String factoryName, int epoch) {this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch); }底層構(gòu)造器
CloseSafeProducer(Producer<K, V> delegate,BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,Duration closeTimeout, String factoryName, int epoch) {Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");this.delegate = delegate; // 原生kafka生產(chǎn)者 this.removeProducer = removeProducer; // 移除生產(chǎn)者的方法this.txIdPrefix = txIdPrefix; // 事務(wù)id前綴 this.closeTimeout = closeTimeout; // 關(guān)閉超時(shí)時(shí)間 Map<MetricName, ? extends Metric> metrics = delegate.metrics(); // 指標(biāo)Iterator<MetricName> metricIterator = metrics.keySet().iterator();// 指標(biāo)迭代器String id;if (metricIterator.hasNext()) {id = metricIterator.next().tags().get("client-id");}else {id = "unknown";}this.clientId = factoryName + "." + id; // 客戶端idthis.created = System.currentTimeMillis(); // 創(chuàng)建時(shí)間為當(dāng)前時(shí)間(毫秒)this.epoch = epoch; // 副本時(shí)代版本號(hào)LOGGER.debug(() -> "Created new Producer: " + this); }【代碼解說(shuō)】
delegate,表示代理,這里指 原生kafka生產(chǎn)者;
很顯然, 構(gòu)造器沒(méi)有啥復(fù)雜邏輯,就是賦值而已;
【4.2】發(fā)送消息
發(fā)送消息有2個(gè)重載方法?
方法1, 直接發(fā)送 ProducerRecord
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {LOGGER.trace(() -> toString() + " send(" + record + ")");return this.delegate.send(record); }方法2,帶回調(diào)的發(fā)送方法,消息發(fā)送完成后的回調(diào)(成功或失敗),回調(diào)方法中可以獲取發(fā)送消息所在的偏移量和分區(qū)等信息(metadata) ;
這個(gè)方法先執(zhí)行了 原生kafka發(fā)送消息分發(fā)的回調(diào), 回調(diào)執(zhí)行完成后,在執(zhí)行 外層傳入的回調(diào);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {LOGGER.trace(() -> toString() + " send(" + record + ")");return this.delegate.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception instanceof OutOfOrderSequenceException) {CloseSafeProducer.this.producerFailed = exception;close(CloseSafeProducer.this.closeTimeout);}callback.onCompletion(metadata, exception);}}); }【4.3】flush 刷新緩沖區(qū)
直接調(diào)用 原生kakfa生產(chǎn)者delegate的flush 方法;?
public void flush() {LOGGER.trace(() -> toString() + " flush()");this.delegate.flush(); }【4.4】kafka事務(wù)操作方法
初始化事務(wù), initTransactions;
開(kāi)啟事務(wù), beginTransaction ;
發(fā)送偏移量到事務(wù), sendOffsetsToTransaction ;
提交事務(wù), commitTransaction ;
中斷事務(wù), abortTransaction ;
【4.5】關(guān)閉生產(chǎn)者方法(非常重要*)
有2個(gè)方法關(guān)閉生產(chǎn)者;
【4.5.1】 close(Duration timeout )
public void close(@Nullable Duration timeout) {LOGGER.trace(() -> toString() + " close(" + (timeout == null ? "null" : timeout) + ")");if (!this.closed) {if (this.producerFailed != null) {LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this);this.closed = true;this.removeProducer.test(this, this.producerFailed instanceof TimeoutException? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT: timeout);}else {this.closed = this.removeProducer.test(this, timeout);}} }調(diào)用 remoteProducer() 移除生產(chǎn)者
// 刪除單個(gè)共享生產(chǎn)者和線程綁定實(shí)例(如果存在)。 protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> producerToRemove , Duration timeout) {if (producerToRemove.closed) {if (producerToRemove.equals(this.producer)) {this.producer = null;producerToRemove.closeDelegate(timeout, this.listeners);}this.threadBoundProducers.remove();return true;}else {return false;} }?
也就是說(shuō),不是每一次發(fā)送消息完成,都關(guān)閉kafka生產(chǎn)者;
【總結(jié)】 何時(shí)關(guān)閉kafka生產(chǎn)者 ?(干貨——非常重要) *
當(dāng)發(fā)送消息拋出異常時(shí),關(guān)閉kafka生產(chǎn)者;?
上述代碼還是調(diào)用了? producerToRemove.closeDelegate(timeout, this.listeners);
其中 producerToRemove 就是 producer(CloseSafeProducer )的 closeDelegate(),下文所述;
?【4.5.2】 CloseSafeProducer.closeDelegate
調(diào)用了 原生kafka生產(chǎn)者的close 方法;
把生產(chǎn)者從監(jiān)聽(tīng)器中移除;
void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {this.delegate.close(timeout == null ? this.closeTimeout : timeout);listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));this.closed = true; }【補(bǔ)充】創(chuàng)建生產(chǎn)者
【代碼解說(shuō)】 以上方法有4個(gè)分支,包括創(chuàng)建分區(qū)帶事務(wù)的生產(chǎn)者, 帶事務(wù)的生產(chǎn)者,每個(gè)線程一個(gè)生產(chǎn)者,普通生產(chǎn)者;
【3.4.1】 創(chuàng)建分區(qū)帶事務(wù)的生產(chǎn)者 createTransactionalProducerForPartition(txIdPrefix)
protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {String suffix = TransactionSupport.getTransactionIdSuffix();if (suffix == null) {return createTransactionalProducer(txIdPrefix);}else {synchronized (this.consumerProducers) {CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);if (consumerProducer == null || expire(consumerProducer)) {CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,this::removeConsumerProducer);this.consumerProducers.put(suffix, newProducer);return newProducer;}else {return consumerProducer;}}} } private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();- 1) 如果后綴為空,調(diào)用? createTransactionalProducer 創(chuàng)建生產(chǎn)者 ; 先從緩存中的阻塞隊(duì)列中獲取,若獲取不到,則 調(diào)用 doCreateTxProducer? 創(chuàng)建;
- ?2,不為空, 調(diào)用 doCreateTxProducer 創(chuàng)建生產(chǎn)者; 它也調(diào)用了? createRawProducer 根據(jù)原生 kafka生產(chǎn)者 創(chuàng)建? CloseSafeProducer
【3.4.2】 帶事務(wù)的生產(chǎn)者 createTransactionalProducer(txIdPrefix)
refer2【3.4.1】分支1。
總結(jié)
以上是生活随笔為你收集整理的spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
 
                            
                        - 上一篇: 联通联网设置(联通宽带联网设置)
- 下一篇: spring-kafka整合:Kafka
