spring-kafka整合:KafkaTemplate-kafka模板类介绍
【README】
1,本文主要關注 KafkaTemplate的重點方法,并非全部方法;
2,KafkaTemplate? 底層依賴于 DefaultKafkaProducerFactory , 關于 DefaultKafkaProducerFactory 的介紹,refer2?
spring-kafka整合:DefaultKafkaProducerFactory默認kafka生產者工廠介紹_PacosonSWJTU的博客-CSDN博客【1】 類描述類描述:單例共享 Producer 實例的 ProducerFactory 實現。此實現將為每次 createProducer() 調用時提供的 Map 配置和可選的 Serializer 實現返回相同的 Producer 實例(如果未啟用事務)。如果您使用的序列化器沒有參數構造函數并且不需要設置,那么最簡單的方法是在傳遞給 DefaultKafkaProducerFactory 構造函數的配置中針對 ProducerConfig.KEY_SERIALIZER_CLASS_Chttps://blog.csdn.net/PacosonSWJTU/article/details/121306370
【1】KafkaTemplate 類說明
用于執行高級操作的模板。 當與 DefaultKafkaProducerFactory 一起使用時,模板是線程安全的。 生產者工廠和 org.apache.kafka.clients.producer.KafkaProducer 確保這一點;
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,ApplicationListener<ContextStoppedEvent>, DisposableBean {?【1.1】構造方法
使用提供的生產者工廠和 autoFlush 設置創建一個實例。
如果您已將生產者的 linger.ms 配置為非默認值并希望立即在此模板上發送操作,無論該設置如何, 又或者您希望阻塞直到服務器根據acs屬性確認已收到消息, 需要把autoFlush設置為true。
如果 configOverrides 不為 null 或不為空,則將使用合并的生產者屬性創建一個新的 DefaultKafkaProducerFactory,這些屬性在提供的工廠屬性之后進行覆蓋。
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,@Nullable Map<String, Object> configOverrides) {Assert.notNull(producerFactory, "'producerFactory' cannot be null");this.autoFlush = autoFlush;this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;// 是否自定義生產者工廠 this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;if (this.customProducerFactory) {Map<String, Object> configs = new HashMap<>(producerFactory.getConfigurationProperties()); // 覆蓋工廠屬性 configs.putAll(configOverrides); // 創建新的 DefaultKafkaProducerFactoryDefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(configs, producerFactory.getKeySerializerSupplier(), producerFactory.getValueSerializerSupplier()); // 設置物理關閉生產者的超時時間 newFactory.setPhysicalCloseTimeout((int) producerFactory.getPhysicalCloseTimeout().getSeconds()); // 設置是否分區 newFactory.setProducerPerConsumerPartition(producerFactory.isProducerPerConsumerPartition()); // 設置是否 每個線程創建一個 生產者; newFactory.setProducerPerThread(producerFactory.isProducerPerThread()); // 新工廠賦值this.producerFactory = newFactory;} else {this.producerFactory = producerFactory;} // 是否開啟kafka事務 this.transactional = this.producerFactory.transactionCapable(); }【1.2】發送消息方法(非常重要)
發送消息有很多方法,大致分為兩類;
- send();
- doSend();
【1.2.1】send() 發送消息
有4個外觀方法,使用的都是默認topic;
@Override public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {return send(this.defaultTopic, data); }@Override public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {return send(this.defaultTopic, key, data); }@Override public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {return send(this.defaultTopic, partition, key, data); }@Override public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {return send(this.defaultTopic, partition, timestamp, key, data); } @Override public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord); }可以看到,最后還是調用了 底層的 doSend() 方法;
【1.2.2】doSend() 方法
5個 doSend() 方法的外觀方法 ,這5個方法對 topic ,分區, 消息key,時間戳,消息value? 進行了重載
@Override public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,@Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {Assert.notNull(record, "'record' cannot be null");return doSend(record); }底層 doSend() 定義如下:
protected ListenableFuture<SendResult<K, V>>
????????doSend(final ProducerRecord<K, V> producerRecord)
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {// 獲取生產者 final Producer<K, V> producer = getTheProducer(producerRecord.topic());this.logger.trace(() -> "Sending: " + producerRecord);final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();Object sample = null;if (this.micrometerEnabled && this.micrometerHolder == null) {this.micrometerHolder = obtainMicrometerHolder();}if (this.micrometerHolder != null) {sample = this.micrometerHolder.start();}// 發送消息 Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));// May be an immediate failure (注意,這里可能馬上失敗,或有運行時異常拋出)if (sendFuture.isDone()) { try {sendFuture.get(); // 這里調用get會阻塞,如果發送沒有完成的話 }catch (InterruptedException e) {Thread.currentThread().interrupt();throw new KafkaException("Interrupted", e);}catch (ExecutionException e) {throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace}}if (this.autoFlush) { // 自動刷新 flush();}this.logger.trace(() -> "Sent: " + producerRecord);return future; }【代碼解說】
step1, 調用了 getTheProducer() 獲取生產者 ;
關于 DefaultKafkaProducerFactory.createProducer() 可以參見 以下博文,因篇幅,本文不再贅述;
spring-kafka整合:DefaultKafkaProducerFactory默認kafka生產者工廠介紹_PacosonSWJTU的博客-CSDN博客
step2,producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)) ; 調用了 buildCallback(...) 構建回調對象;
?
非事務模式,則關閉生產者;
由 DefaultKafkaProducerFactory 可知, 生產者是? CloseSafeProducer, 其包裹了 原生 kafka生產者; 所以 調用了 CloseSafeProducer.close() 方法;
?
?step3,自動刷新緩存 flush();?
如果 ProducerFactory 提供單例生產者(例如 DefaultKafkaProducerFactory),則調用此方法才有意義。
public void flush() {Producer<K, V> producer = getTheProducer();try {producer.flush();}finally {closeProducer(producer, inTransaction());} }protected void closeProducer(Producer<K, V> producer, boolean inTx) {if (!inTx) { // 非事務才關閉 producer.close(this.closeTimeout);} }【2】KafkaTemplate 發送消息與生產者復用?
?我們再次 follow了 DefaultKafkaProducerFactory的 doCreateProducer() 方法;
第1次因為發送消息 新建了 producer;
第2次再發送消息時,因為producer 不為null;所以直接取走;
同時 synchronized同步塊可以避免并發問題;
發送消息后,是否關閉生產者,可以參考 【小結】
【小結】
通過分析 KafkaTemplate.doSend() 消息發送分發, 我們可以看到,
每發送一條消息,如果拋出異常的話,則會關閉kafka生產者,否則不會關閉生產者;原因參見??
spring-kafka整合:DefaultKafkaProducerFactory默認kafka生產者工廠介紹_PacosonSWJTU的博客-CSDN博客https://blog.csdn.net/PacosonSWJTU/article/details/121306370中的章節 【4.5.1】;
總結
以上是生活随笔為你收集整理的spring-kafka整合:KafkaTemplate-kafka模板类介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring-kafka整合:Defau
- 下一篇: springboot:BeanPostP