转:Kafka事务使用和编程示例/实例
Kafka事務使用和編程示例/實例_JobShow裁員加班實況-微信小程序-CSDN博客一、概述? Kafka事務特性是指一系列的生產者生產消息和消費者提交偏移量的操作在一個事務中,或者說是一個原子操作,生產消息和提交偏移量同時成功或者失敗。注意:kafka事務和DB事務。在理解消息的事務時,一直處于一個錯誤理解是,把操作db的業務邏輯跟操作消息當成是一個事務,如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:讀取消息或生產消息 kafkaOperation(); // 2.db操作 dbOperation()https://blog.csdn.net/u010002184/article/details/113933973
一、概述
? Kafka事務特性是指一系列的生產者生產消息和消費者提交偏移量的操作在一個事務中,或者說是一個原子操作,生產消息和提交偏移量同時成功或者失敗。
-
注意:kafka事務和DB事務。
在理解消息的事務時,一直處于一個錯誤理解是,把操作db的業務邏輯跟操作消息當成是一個事務,如下所示:
void kakfa_in_tranction(){// 1.kafa的操作:讀取消息或生產消息kafkaOperation();// 2.db操作dbOperation(); }操作DB數據庫的數據源是DB,消息數據源是kfaka,這是完全不同兩個數據。一種數據源(如mysql,kafka)對應一個事務,所以它們是兩個獨立的事務。kafka事務指kafka一系列 生產、消費消息等操作組成一個原子操作,db事務是指操作數據庫的一系列增刪改操作組成一個原子操作。
二、事務的使用
Kafka中的事務特性主要用于以下兩種場景:
-
生產者發送多條消息可以封裝在一個事務中,形成一個原子操作。多條消息要么都發送成功,要么都發送失敗。
-
read-process-write模式:將消息消費和生產封裝在一個事務中,形成一個原子操作。在一個**流式處理**的應用中,常常一個服務需要從上游接收消息,然后經過處理后送達到下游,這就對應著消息的消費和生成。
當事務中僅僅存在Consumer消費消息的操作時,它和Consumer手動提交Offset并沒有區別。因此單純的消費消息并不是Kafka引入事務機制的原因,單純的消費消息也沒有必要存在于一個事務中。
三、事務相關的API
1, api
/*** 初始化事務*/public void initTransactions();/*** 開啟事務*/public void beginTransaction() throws ProducerFencedException ;/*** 在事務內提交已經消費的偏移量*/public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) ;/*** 提交事務*/public void commitTransaction() throws ProducerFencedException;/*** 丟棄事務*/public void abortTransaction() throws ProducerFencedException ;2,事務配置
2,1 生產者
需要設置transactional.id屬性。
設置了transactional.id屬性后,enable.idempotence屬性會自動設置為true。
2.2 消費者
?需要設置isolation.level = read_committed,這樣Consumer只會讀取已經提交了事務的消息。另外,需要設置enable.auto.commit = false來關閉自動提交Offset功能。
四、事務使用示例
1, 需求
在Kafka的topic:ods_user中有一些用戶數據,數據格式如下:
?
姓名,性別,出生日期
張三,1,1980-10-09
李四,0,1985-11-01
?
我們需要編寫程序,將用戶的性別轉換為男、女(1-男,0-女),轉換后將數據寫入到topic:dwd_user中。要求使用事務保障,要么消費了數據同時寫入數據到 topic,提交offset。要么全部失敗。
2,控制臺模擬數據
# 創建名為ods_user和dwd_user的主題 bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic ods_user --partitions 3 --replication-factor 2 bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic dwd_user --partitions 3 --replication-factor 2 # 生產數據到 ods_user bin/kafka-console-producer.sh --broker-list node-1:9092 --topic ods_user # 從dwd_user消費數據 bin/kafka-console-consumer.sh --bootstrap-server node-1:9092 --topic dwd_user --from-beginning3, 詳細代碼
public class TransUse {public static void main(String[] args) {Consumer<String, String> consumer = createConsumer();Producer<String, String> producer = createProduceer();// 初始化事務producer.initTransactions();while(true) {try {// 1. 開啟事務producer.beginTransaction();// 2. 定義Map結構,用于保存分區對應的offsetMap<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();// 2. 拉取消息ConsumerRecords<String, String> records = consumer.poll(2000);for (ConsumerRecord<String, String> record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));// 4. 進行轉換處理String[] fields = record.value().split(",");fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";String message = fields[0] + "," + fields[1] + "," + fields[2];// 5. 生產消息到dwd_userproducer.send(new ProducerRecord<>("dwd_user", message));}// 6. 提交偏移量到事務producer.sendOffsetsToTransaction(offsetCommits, "ods_user");// 7. 提交事務producer.commitTransaction();} catch (Exception e) {// 8. 放棄事務producer.abortTransaction();}}}// 1. 創建消費者public static Consumer<String, String> createConsumer() {// 1. 創建Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node-1:9092");props.setProperty("group.id", "ods_user");props.put("isolation.level","read_committed");props.setProperty("enable.auto.commit", "false");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2. 創建Kafka消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 訂閱要消費的主題consumer.subscribe(Arrays.asList("ods_user"));return consumer;}// 2. 創建生產者public static Producer<String, String> createProduceer() {// 1. 創建生產者配置Properties props = new Properties();props.put("bootstrap.servers", "node-1:9092");props.put("transactional.id", "dwd_user");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 創建生產者Producer<String, String> producer = new KafkaProducer<>(props);return producer;}}4,異常模擬
// 3. 保存偏移量 offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)); // 4. 進行轉換處理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女"; String message = fields[0] + "," + fields[1] + "," + fields[2];// 模擬異常 int i = 1/0;// 5. 生產消息到dwd_user producer.send(new ProducerRecord<>("dwd_user", message));?我們發現,可以消費到消息,但如果中間出現異常的話,offset是不會被提交的,除非消費、生產消息都成功,才會提交事務。
轉自:
kafka事務使用和編程示例_青眼酷白龍的博客-CSDN博客_kafka事務使用
-------------------------------------------
一、事務場景
二、幾個關鍵概念和推導
1.因為producer發送消息可能是分布式事務,所以引入了常用的2PC,所以有事務協調者(Transaction Coordinator)。Transaction Coordinator和之前為了解決腦裂和驚群問題引入的Group Coordinator在選舉和failover上面類似。
2.事務管理中事務日志是必不可少的,kafka使用一個內部topic來保存事務日志,這個設計和之前使用內部topic保存位點的設計保持一致。事務日志是Transaction Coordinator管理的狀態的持久化,因為不需要回溯事務的歷史狀態,所以事務日志只用保存最近的事務狀態。
3.因為事務存在commit和abort兩種操作,而客戶端又有read committed和read uncommitted兩種隔離級別,所以消息隊列必須能標識事務狀態,這個被稱作Control Message。
4.producer掛掉重啟或者漂移到其它機器需要能關聯的之前的未完成事務所以需要有一個唯一標識符來進行關聯,這個就是TransactionalId,一個producer掛了,另一個有相同TransactionalId的producer能夠接著處理這個事務未完成的狀態。注意不要把TransactionalId和數據庫事務中常見的transaction id搞混了,kafka目前沒有引入全局序,所以也沒有transaction id,這個TransactionalId是用戶提前配置的。
5. TransactionalId能關聯producer,也需要避免兩個使用相同TransactionalId的producer同時存在,所以引入了producer epoch來保證對應一個TransactionalId只有一個活躍的producer epoch
三、事務語義
2.1.? 多分區原子寫入
事務能夠保證Kafka topic下每個分區的原子寫入。事務中所有的消息都將被成功寫入或者丟棄。例如,處理過程中發生了異常并導致事務終止,這種情況下,事務中的消息都不會被Consumer讀取。現在我們來看下Kafka是如何實現原子的“讀取-處理-寫入”過程的。
首先,我們來考慮一下原子“讀取-處理-寫入”周期是什么意思。簡而言之,這意味著如果某個應用程序在某個topic tp0的偏移量X處讀取到了消息A,并且在對消息A進行了一些處理(如B = F(A))之后將消息B寫入topic tp1,則只有當消息A和B被認為被成功地消費并一起發布,或者完全不發布時,整個讀取過程寫入操作是原子的。
現在,只有當消息A的偏移量X被標記為消耗時,消息A才被認為是從topic tp0消耗的,消費到的數據偏移量(record offset)將被標記為提交偏移量(Committing offset)。在Kafka中,我們通過寫入一個名為__consumer_offsets topic的內部Kafka topic來記錄offset commit。消息僅在其offset被提交給__consumer_offsets topic時才被認為成功消費。
由于offset commit只是對Kafkatopic的另一次寫入,并且由于消息僅在提交偏移量時被視為成功消費,所以跨多個主題和分區的原子寫入也啟用原子“讀取-處理-寫入”循環:提交偏移量X到offset topic和消息B到tp1的寫入將是單個事務的一部分,所以整個步驟都是原子的。
kafka系列九、kafka事務原理、事務API和使用場景 - 小人物的奮斗 - 博客園
總結
以上是生活随笔為你收集整理的转:Kafka事务使用和编程示例/实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 荒漠屠夫天赋符文(荒漠屠夫天赋符文推荐)
- 下一篇: 联通联网设置(联通宽带联网设置)