消息队列之Kafka
背景:Dubbo遠程調用的性能問題
Dubbo調用普遍存在于我們的微服務項目中, 這些Dubbo調用全部是同步的操作, 這里的"同步"指:消費者A調用生產者B之后,A的線程會進入阻塞狀態,等待生產者B運行結束返回之后,A才能運行之后的代碼, Dubbo消費者發送調用后進入阻塞狀態,這個狀態表示該線程仍占用內存資源,但是什么動作都不做, 如果生產者運行耗時較久,消費者就一直等待,如果消費者利用這個時間,那么可以處理更多請求,業務整體效率會提升
實際情況下,Dubbo有些必要的返回值必須等待,但是不必要等待的服務返回值,我們可以不等待去做別的事情, 這種情況下我們就要使用消息隊列
什么是消息隊列
消息隊列(Message Queue)簡稱MQ,也稱:"消息中間件",? 消息隊列是采用"異步"的方式來傳遞數據完成業務操作流程的業務處理方式(要求兩個微服務項目并不需要同時完成請求)
消息隊列的特征
-  
利用異步的特性, 提高服務器的運行效率, 減少因為遠程調用出現的線程等待\阻塞時間
 -  
削峰填谷:在并發峰值超過當前系統處理能力時,我們將沒處理的信息保存在消息隊列中,在后面出現的較閑的時間中去處理,直到所有數據依次處理完成,能夠防止在并發峰值時短時間大量請求而導致的系統不穩定
 -  
消息隊列的延時:因為是異步執行,請求的發起者并不知道消息何時能處理完,如果業務不能接受這種延遲,就不要使用消息隊列
 
常見消息隊列軟件
-  
Kafka:性能好\功能弱:適合大數據量,高并發的情況,大數據領域使用較多
 -  
RabbitMQ:功能強\性能一般:適合發送業務需求復雜的消息隊列,java業務中使用較多
 -  
RocketMQ:阿里的
 -  
ActiveMQ:前幾年流行的,老項目可能用到
 
消息隊列的事務處理
當接收消息隊列中信息的模塊運行發送異常時,怎么完成事務的回滾? ?
在處理消息隊列異常時,經常會設置一個"死信隊列",將無法處理的異常信息發送到這個隊列中, 死信隊列沒有任何處理者,通常情況下會有專人周期性的處理死信隊列的消息
什么是Kafka
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。Kafka最初是由LinkedIn開發,并隨后于2011年初開源。 ?
Kafka Cluster(Kafka集群)
Producer:消息的發送方,也就是消息的來源,Kafka中的生產者
order就是消息的發送方,在Dubbo中order是消費者,這個身份變化了
Consumer:消息的接收方,也是消息的目標,Kafka中的消費者
stock就是消息的接收方,在Dubbo中stock是生產者,這個身份變化了
Topic:話題或主題的意思,消息的收發雙方要依據同一個話題名稱,才不會將信息錯發給別人
Record:消息記錄,就是生產者和消費者傳遞的信息內容,保存在指定的Topic中
Kafka的特征與優勢
Kafka作為消息隊列,它和其他同類產品相比,突出的特點就是性能強大
Kafka將消息隊列中的信息保存在硬盤中
Kafka對硬盤的讀取規則進行優化后,效率能夠接近內存
硬盤的優化規則主要依靠"順序讀寫,零拷貝,日志壓縮等技術"
Kafka處理隊列中數據的默認設置:
-  
Kafka隊列信息能夠一直向硬盤中保存(理論上沒有大小限制)
 -  
Kafka默認隊列中的信息保存7天,可以配置這個時間,縮短這個時間可以減少Kafka的磁盤消耗
 
Kafka的安裝和配置
必須將我們kafka軟件的解壓位置設置在一個根目錄,文件夾名稱盡量短(例如:kafka), 然后路徑不要有空格和中文,?我們要創建一個空目錄用于保存Kafka運行過程中產生的數據,本次創建名稱為data的空目錄,下面進行Kafka啟動前的配置,先到D:\kafka\config下配置有文件zookeeper.properties,找到dataDir屬性修改如下
dataDir=D:/data修改完畢之后要Ctrl+S進行保存,否則修改無效!!!!
注意D盤和data文件夾名稱,匹配自己電腦的真實路徑和文件夾名稱
還要修改server.properties配置文件
log.dirs=D:/data啟動kafka
要想啟動Kafka必須先啟動Zookeeper ?
Zookeeper介紹
Linux服務器中安裝的各種軟件,很多都是有動物形象的
如果這些軟件在Linux中需要修改配置信息的話,就需要進入這個軟件,去修改配置,每個軟件都需要單獨修改配置的話,工作量很大
我們使用Zookeeper之后,可以創建一個新的管理各種軟件配置的文件管理系統
Linux系統中各個軟件的配置文件集中到Zookeeper中
實現在Zookeeper中,可以修改服務器系統中的各個軟件配置信息
長此以往,很多軟件就刪除了自己寫配置文件的功能,而直接從Zookeeper中獲取
Kafka就是需要將配置編寫在Zookeeper中的軟件之一
所以要先啟動zookeeper才能啟動kafka
Zookeeper啟動
進入路徑D:\kafka\bin\windows
輸入cmd進入dos命令行
D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.propertieskafka啟動
D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.propertiesKafka使用演示
啟動的zookeeper和kafka的窗口不要關閉, 我們在csmall項目中編寫一個kafka使用的演示, csmall-cart-webapi模塊, 添加依賴
<!-- Google JSON API --> <!-- 它是java對象和json格式字符串相互轉換的工具類 --> <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId> </dependency> <!-- Kafka API --> <!-- Spring整合支持Kafka的依賴 --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>修改yml文件進行配置 ?
spring:kafka:# 定義kafka的位置bootstrap-servers: localhost:9092# consumer.group-id是一個必須配置的設置,不配置的話啟動時會報錯# 意思是"話題分組",這配置的目的是為了區分不同項目的話題名稱# 本質上,這個分組名稱會在消息發送時,自動前綴在話題名稱前# 例如當前項目發送了一個話題名稱為message的消息,實際傳輸的話題名稱為csmall.messageconsumer:group-id: csmall?在SpringBoot啟動類中添加啟動Kafka的注解
@SpringBootApplication @EnableDubbo // 啟動對kafka的支持 @EnableKafka // 為了測試kafka收發消息的功能 // 我們利用SpringBoot自帶的任務調用工具,周期性的向kafka發送消息 // 明確下面的注解和kafka沒有必然的支持關系 @EnableScheduling public class CsmallCartWebapiApplication {public static void main(String[] args) {SpringApplication.run(CsmallCartWebapiApplication.class, args);}}下面我們就可以實現周期性的向kafka發送消息并接收的操作了
編寫消息的發送
cart-webapi包下創建kafka包
包中創建Producer類來發送消息
?生產者
// 這個類中要編寫代碼進行周期運行,所以要交由Spring管理 @Component public class Producer {// 直接從Spring容器中獲取能夠操作Kafka的對象// 這個對象是在添加好依賴和yml配合后,啟動SpringBoot時自動添加到Spring容器的// KafkaTemplate<[話題名稱的類型],[消息的類型]>@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;int i=1;// 實現每隔10秒鐘(10000毫秒)運行一次的方法@Scheduled(fixedRate = 10000)public void sendMessage(){// 實例化Cart對象并賦值,嘗試發送給KafkaCart cart=new Cart();cart.setId(i++);cart.setCommodityCode("PC100");cart.setUserId("UU100");cart.setPrice(RandomUtils.nextInt(90)+10);cart.setCount(RandomUtils.nextInt(10)+1);// {"id":"1","userId":"UU100",.....}// 利用gson工具,將cart對象轉換為json格式字符串方便發送Gson gson=new Gson();String json=gson.toJson(cart);System.out.println("要發送的消息為:"+json);// 執行發送kafkaTemplate.send("myCart",json);}}kafka包中創建一個叫Consumer的類來接收消息, 接收消息的類可以是本模塊的類,也可以是其它模塊的類,編寫的代碼是完全一致
// 要接收kafka的消息需要講接收消息的對象保存到Spring容器中 // 因為KafkaTemplate是spring在管理的 @Component public class Consumer {// SpringKafka接收消息依靠了框架提供的"監聽機制"// 框架中有一個線程,一直實時關注kafka的消息情況// 如果我們指定的話題名稱(myCart)接收了消息,那么這條線程就會自動調用下面的方法@KafkaListener(topics = "myCart")// 下面定義的方法就是接收到消息后運行的方法// 這個方法有參數,參數類型是固定的,參數的值就是監聽器接收到的消息內容public void received(ConsumerRecord<String,String> record){// 參數類型必須是ConsumerRecord// 泛型<[話題名稱的類型],[消息的類型]>// 我們可以將record視為從kafka中接收到的消息對象String json=record.value();// json可能的值: {"id":2,"commodityCode":"PC100","price":74,"count":8,"userId":"UU100"}Gson gson=new Gson();// gson也可以將json字符串轉換為java對象Cart cart=gson.fromJson(json,Cart.class);System.out.println("接收到了消息:"+cart);}}總結
以上是生活随笔為你收集整理的消息队列之Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: jk触发器改为四进制_关于触发器
 - 下一篇: 酷狗音乐盒forlinu酷狗音乐盒有没