使用 kafka 提升你的订单接口吞吐量
今日推薦
最適合晚上睡不著看的 8 個網站,建議收藏哦
我們使用的是jdk自帶的隊列,實現了服務的吞吐量增加,但是我們知道的是,jdk的隊列時基于內存的,即當請求量很大的時候,大量的請求緩存在內存當中,對于內存的要求還是很大的,不是很適合并發量很大的業務場景。
尤其是在電商的場景,都會通過消息隊列的削峰,解耦,從而提高系統的吞吐量,保證穩定性。所以我們接下來,繼續對系統進行改進,引入kafka,進一步對于穩定性進行完善。
關于kafka的安裝,介紹,集成,請參考文末鏈接。
一、引入Kafka
引入依賴:
<!--?https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka?--> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.10.RELEASE</version> </dependency>添加配置:
spring:kafka:bootstrap-servers:?172.16.3.29:9092producer:#?發生錯誤后,消息重發的次數。retries:?0#??????#當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。#??????batch-size:?16384#??????#?設置生產者內存緩沖區的大小。#??????buffer-memory:?33554432#??????#?鍵的序列化方式key-serializer:?org.apache.kafka.common.serialization.StringSerializer#??????#?值的序列化方式value-serializer:?org.apache.kafka.common.serialization.StringSerializer#??????# acks=0?:?生產者在成功寫入消息之前不會等待任何來自服務器的響應。#??????# acks=1 :?只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。#??????# acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。#??????acks:?1consumer:group-id:?test#??????#?自動提交的時間間隔?在spring?boot?2.X?版本中這里采用的是值的類型為Duration?需要符合特定的格式,如1S,1M,2H,5D#??????auto-commit-interval:?1S#??????#?該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:#??????#?latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)#??????# earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄#??????auto-offset-reset:?earliest#??????#?是否自動提交偏移量,默認值是true,為了避免出現重復數據和數據丟失,可以把它設置為false,然后手動提交偏移量#??????enable-auto-commit:?false#??????#?鍵的反序列化方式key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer#??????#?值的反序列化方式value-deserializer:?org.apache.kafka.common.serialization.StringDeserializermax-poll-records:?150listener:#??????#?在偵聽器容器中運行的線程數。#??????concurrency:?5#??????#listner負責ack,每調用一次,就立即commit#??????ack-mode:?manual_immediatemissing-topics-fatal:?false二、測試服務改造
2.1 消息生產者
提供一個簡單的生產者工具類,只有簡單的發送消息一個方法,參數是 topic 和 message。
import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.kafka.core.KafkaTemplate; import?org.springframework.stereotype.Component;/***?kafka生產者**?@author?weirx*?@date?2021/02/03?14:22**/ @Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;/***?kafka消息發送*?@param*?@author?weirx*?@return?void*?@date:?2021/2/3*/public?void?send(String?topic,String?message){kafkaTemplate.send(topic,message);} }2.2 下單接口改造
引入KafkaProducer,下單時替換http請求成kafka推送,偽代碼如下:
@Autowired private?KafkaProducer?kafkaProducer; kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));2.3 支付接口改造
在前面一篇文章當中,支付接口是作為下單接口的回調接口被調用的,其實是一個完全同步的接口,易出現問題,如阻塞,請求失敗、超時等。
所以此處我們也將其改造成kafka異步消費,消費者工具類如下所示:
@Slf4j @Component public?class?KafkaConsumer?{@Autowiredprivate?TradingServiceImpl?tradingService;@KafkaListener(topics?=?{"rob-necessities-trading"})public?void?consumer(ConsumerRecord<?,??>?record)?{Optional<?>?kafkaMessage?=?Optional.ofNullable(record.value());if?(kafkaMessage.isPresent())?{Object?message?=?kafkaMessage.get();String?orderId?=?message.toString();log.info("支付開始時間***********************:{},訂單id:?{}",?LocalDateTime.now(),?orderId);tradingService.pay(Long.valueOf(orderId));log.info("支付完成時間/:{},訂單id:?{}",?LocalDateTime.now(),?orderId);}} }三、訂單服務改造
3.1 支付回調改造
前面的支付方式,我們是在訂單完成后通過http接口形式,現在改用kafka,所以我們需要提供一個kafka消息生產者,將消息同送到測試服務:
@Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;/***?kafka消息發送*?@param*?@author?weirx*?@return?void*?@date:?2021/2/3*/public?void?send(String?topic,String?message){kafkaTemplate.send(topic,message);} }3.2 下單接口改造
接口下單命令的方式不再是等待http請求調用了,此處變成監聽kafka,提供消費者如下:
@Slf4j @Component public?class?KafkaConsumer?{@Autowiredprivate?OrderService?orderService;@KafkaListener(topics?=?{"rob-necessities-order"})public?void?consumer(ConsumerRecord<?,??>?record)?{Optional<?>?kafkaMessage?=?Optional.ofNullable(record.value());if?(kafkaMessage.isPresent())?{Object?message?=?kafkaMessage.get();log.info("-----------------?record?="?+?record);log.info("------------------?message?="?+?message);JSONObject?jsonObject?=?JSONObject.parseObject(message.toString());OrderDTO?orderDTO?=?jsonObject.toJavaObject(OrderDTO.class);orderService.saveOrder(orderDTO);}} }因為我們已經使用了kafka作為并發時流量緩沖的組件,就不在需要我們前面自己添加進來的隊列了,所以改造后的下單接口如下所示:
@Autowired private?KafkaProducer?kafkaProducer;@Override public?Result?saveOrder(OrderDTO?orderDTO)?{//?下單實現Result?result?=?this.saveOrderImpl(orderDTO);String?orderId?=?JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id");kafkaProducer.send("rob-necessities-trading",?orderId);return?Result.success("下單成功"); }如上所示,具體的訂單業務邏輯沒有變化。
四、測試
kafka是單節點的,在其他服務器上,因為我本地沒有內存了。
全部完成時間大概是23秒,時間上有些許增加,但是整體吞吐量跟以前絕對不是一個量級的了。
另外,我們在調用支付接口的時候也可以通過kafka的形式,但是本文不做修改了。
補充
kafka安裝教程:https://juejin.cn/post/7004625159298482206
springboot集成kafka:https://juejin.cn/post/7005438980455923726
kafka專欄:https://juejin.cn/column/6996836630841524238
本文項目代碼gitee地址:https://gitee.com/wei_rong_xin/rob-necessities.git
來源:juejin.cn/post/7068450775743070244
最后,再給大家推薦一個GitHub項目,該項目整理了上千本常用技術PDF,技術書籍都可以在這里找到。
GitHub地址:https://github.com/hello-go-maker/cs-books
電子書已經更新好了,拿走不謝,記得點一個star,持續更新中...總結
以上是生活随笔為你收集整理的使用 kafka 提升你的订单接口吞吐量的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 满屏的if-else,看我怎么消灭你!
- 下一篇: 别再用 httpClient了,快试试这