卡夫卡如何分区_通过分区在卡夫卡实现订单担保人
卡夫卡如何分區
Kafka最重要的功能之一是實現消息的負載平衡,并保證分布式集群中的排序,否則在傳統隊列中是不可能的。
首先讓我們嘗試了解問題陳述
讓我們假設我們有一個主題,其中發送消息,并且有一個消費者正在使用這些消息。
如果只有一個使用者,它將按消息在隊列中的順序或發送的順序接收消息。
現在,為了獲得更高的性能,我們需要更快地處理消息,因此我們引入了消費者應用程序的多個實例。
如果消息包含任何狀態,則將導致問題。
讓我們嘗試通過一個例子來理解這一點:
如果對于特定的消息ID,我們有3個事件:
第一:創建
第二:更新 第三:刪除 我們要求僅在消息的“創建”事件之后才處理消息的“更新”或“刪除”事件。 現在,如果兩個單獨的實例幾乎同時獲得同一消息的“ CREATE”和“ UPDATE”,則即使另一個實例完成“ CREATE”消息之前,帶有“ UPDATE”消息的實例仍有機會嘗試對其進行處理。 。 這可能是一個問題,因為使用者將嘗試更新尚未創建的消息,并且將引發異常,并且此“更新”可能會丟失。
可能的解決方案
我想到的第一個解決方案是數據庫上的樂觀鎖,這可以防止這種情況,但是隨后需要適應異常情況。 這不是一個非常簡單的方法,可能涉及更多的鎖定和要處理的并發問題。
另一個更簡單的解決方案是,如果特定ID的消息/事件總是轉到特定實例,因此它們將是有序的。 在這種情況下,CREATE將始終在UPDATE之前執行,因為這是發送它們的原始順序。
這就是卡夫卡派上用場的地方。
Kafka在主題中具有“分區”的概念,該概念既可以提供訂購保證,又可以在整個消費者流程中提供負載平衡。
每個分區都是有序的,不可變的消息序列,這些消息連續地附加到提交日志中。 分區中的每個消息均分配有一個順序ID號,稱為偏移量,該ID唯一地標識分區中的每個消息。
因此,一個主題將具有多個分區,每個分區保持各自的偏移量。
現在,要確保將具有特定id的事件始終轉到特定實例,可以執行以下操作:如果我們將每個使用者與特定分區綁定在一起,然后確保具有特定id的所有事件和消息始終轉到特定實例,特定分區,因此它們始終由同一使用者實例使用。
為了實現此分區,Kafka客戶端API為我們提供了兩種方法:
1)定義用于分區的鍵,該鍵將用作默認分區邏輯的鍵。
2)編寫一個Partitioning類來定義我們自己的分區邏輯。
讓我們探索第一個:
默認分區邏輯
默認的分區策略是hash(key)%numPartitions 。 如果鍵為null,則選擇一個隨機分區。 所以,如果我們要為分區鍵是一個特定屬性,我們需要將它傳遞在ProducerRecord構造而從發送消息Producer 。
讓我們來看一個例子:
注意:要運行此示例,我們需要具備以下條件:
1.運行Zookeeper(在localhost:2181)
2.運行Kafka(位于localhost:9092) 3.創建一個帶有3個分區的名為“ TRADING-INFO”的主題。(為簡單起見,我們可以只有一個代理。) 要完成以上三個步驟,請遵循此處的文檔。
假設我們正在發送有關“ TRADING-INFO”主題的交易信息,該信息由消費者消費。
1.貿易艙
(注意:我在這里使用過Lombok )
@Data @Builder public class Trade {private String id;private String securityId;private String fundShortName;private String value; }2. Kafka客戶端依賴
為了制作一個Kafka Producer,我們需要包含Kafka依賴項:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.0.0</version></dependency>卡夫卡制片人
public class Producer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);ExecutorService executorService = Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i = idStart; i <= idEnd; i++) {Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();try {String s = new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println("Sending to " + topic + "msg : " + s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props = new Properties();String KAFKA_SERVER_IP = "localhost:9092";props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}消費者
public class TConsumer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";final String CONSUMER_GROUP_ID = "consumer-group";KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);consumerRecords.forEach(e -> {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", consumerGroupId);props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());return props;} }由于我們有3個分區,因此我們將運行3個Consumer實例。
現在,當我們使用不同的線程運行生產者時,生成具有3種“安全類型”消息的消息,這是我們的關鍵。 我們將看到,特定的實例總是迎合特定的“安全類型”,因此將能夠按順序處理消息。
產出
消費者1:
{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"} {"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"} {"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"} {"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"} {"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}消費者2:
{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"} {"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"} {"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"} {"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"} {"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}消費者3:
{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"} {"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"} {"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"} {"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"} {"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}因此,這里的3種類型的“ securityIds”生成了不同的哈希值,因此被分配到了不同的分區中,從而確保一種交易總是去往特定的實例。
現在,如果我們不想使用默認的分區邏輯并且我們的場景更加復雜,我們將需要實現自己的Partitioner,在下一個博客中,我將解釋如何使用它以及它如何工作。
翻譯自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html
卡夫卡如何分區
總結
以上是生活随笔為你收集整理的卡夫卡如何分区_通过分区在卡夫卡实现订单担保人的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux安装界面中不包含()(linu
- 下一篇: linux温度命令(linux温度)