SpringBoot实战(十四)之整合KafKa
?本人今天上午參考了不少博文,發現不少博文不是特別好,不是因為依賴沖突問題就是因為版本問題。
于是我結合相關的博文和案例,自己改寫了下并參考了下,于是就有了這篇文章。希望能夠給大家幫助,少走一些彎路。
?
一、KafKa的介紹
1.主要功能
根據官網的介紹,ApacheKafka?是一個分布式流媒體平臺,它主要有3種功能:
a.發布和訂閱消息流,這個功能類似于消息隊列,這也是kafka歸類為消息隊列框架的原因。
b.以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流。
c.可以再消息發布的時候進行處理。
?
2.使用場景
a.在系統或應用程序之間構建可靠的用于傳輸實時數據的管道,消息隊列功能。
b.構建實時的流數據處理程序來變換或處理數據流,數據處理功能。
?
3.詳細介紹
?Kafka目前主要作為一個分布式的發布訂閱式的消息系統使用,下面簡單介紹一下kafka的基本機制
消息傳輸過程:
?
Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。
?
Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息
?
Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。
?
二、安裝
安裝包下載地址:http://kafka.apache.org/downloads
找到0.11.0.1版本,如圖:
1.下載
wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
?
2.解壓
tar -xzvf kafka_2.11-0.11.0.1.tgz
配置說明:
?? consumer.properites 消費者配置,這個配置文件用于配置開啟的消費者,此處我們使用默認的即可。
?? producer.properties 生產者配置,這個配置文件用于配置開啟的生產者,此處我們使用默認的即可。
server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,目前僅介紹幾個最基礎的配置。
?????? a.broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,并且集群中的每一個kafka服務器的id都應是唯一的,我們這里采用默認配置即可。
?????? b.listeners 申明此kafka服務器需要監聽的端口號,如果是在本機上跑虛擬機運行可以不用配置本項,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,
例如:listeners=PLAINTEXT:// 192.168.126.143:9092。并確保服務器的9092端口能夠訪問。
c.zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由于本次使用的是kafka高版本中自帶zookeeper,
使用默認配置即可,zookeeper.connect=localhost:2181。
?
3.運行
首先運行zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
運行成功,顯示如圖:
?
然后運行kafka
bin/kafka-server-start.sh config/server.properties
?運行成功,顯示如圖:
?
三、整合KafKa
1.新建Maven項目導入Maven依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.test</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><!-- 指定編譯版本 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins><finalName>${project.artifactId}</finalName></build></project>
?
2.編寫消息實體
package com.springboot.kafka.bean;import java.util.Date;import lombok.Data;@Data
public class Message {private Long id; //idprivate String msg; //消息private Date sendTime; //時間戳} ?有了lombok,每次編寫實體不必要使用快捷鍵生成seter或geter方法了,代碼看起來更加簡潔了。
?
3.編寫消息發送者(可以理解為生產者,最好聯系詳細介紹中的圖)
package com.springboot.kafka.producer;import java.util.Date; import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component;import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component @Slf4j public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//發送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));} }
?
4.編寫消息接收者(可以理解為消費者)
package com.springboot.kafka.producer;import java.util.Date; import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component;import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component @Slf4j public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//發送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));} }
?
5.編寫啟動類
package com.springboot.kafka;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext;import com.springboot.kafka.producer.KafkaSender;@SpringBootApplication public class KafkaApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);KafkaSender sender = context.getBean(KafkaSender.class);for (int i = 0; i < 3; i++) {//調用消息發送類中的消息發送方法 sender.send();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}} }
?
?
6.編寫application.properties配置文件
#============== kafka =================== # \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A spring.kafka.bootstrap-servers=192.168.126.143:9092#=============== provider =======================spring.kafka.producer.retries=0 # \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer ======================= # \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
?
7.運行結果
?
示例代碼地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo
如果按照上述流程沒有達到預計的效果可以git clone到本地。
?
轉載于:https://www.cnblogs.com/youcong/p/10216573.html
總結
以上是生活随笔為你收集整理的SpringBoot实战(十四)之整合KafKa的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个带有水含义的好听的名字
- 下一篇: 现在佳能单反相机5d3机身多少钱