kafka 怎么样连接图形化界面_从零开始搭建Kafka+SpringBoot分布式消息系统
前言
由于kafka強依賴于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java編寫的,需運行在jvm上,所以首先應具備java環境。
(ps:默認您的centos系統可聯網,本教程就不教配置ip什么的了)
(ps2:沒有wget的先裝一下:yum install wget)
(ps3:人啊,就是要條理。東邊放一點,西邊放一點,過段時間就不知道自己裝在哪里了。本教程所有下載均放在/usr/local目錄下)
(ps4:kafka可能有內置zookeeper,感覺可以越過zookeeper教程,但是這里也配置出來了。我沒試過)
一、配置jdk
因為oracle 公司不允許直接通過wget 下載官網上的jdk包。所以你直接wget以下地址下載下來的是一個只有5k的網頁文件而已,并不是需要的jdk包。(壟斷地位就是任性)。
(請通過java -version判斷是否自帶jdk,我的沒帶)
1、官網下載
下面是jdk8的官方下載地址:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html2、上傳解壓
這里通過xftp上傳到服務器指定位置:/usr/local
對壓縮文件進行解壓:
tar -zxvf jdk-8u221-linux-x64.tar.gz對解壓后的文件夾進行改名:
mv jdk1.8.0_221 jdk1.83、配置環境變量
vim /etc/profile#java environmentexport JAVA_HOME=/usr/local/jdk1.8
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
操作之后的界面如下:
運行命令使環境生效
source /etc/profile二、搭建zookeeper集群
1、下載zookeeper
創建zookeeper目錄,在該目錄下進行下載:
mkdir /usr/local/zookeeper這一步如果出現連接被拒絕時可多試幾次,我就是第二次請求才成功的。
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz等待下載完成之后解壓:
tar -zxvf zookeeper-3.4.6.tar.gz重命名為zookeeper1
mv zookeeper-3.4.6 zookeeper1cp -r zookeeper1 zookeeper2
cp -r zookeeper1 zookeeper3
2、創建data、logs文件夾
在zookeeper1目錄下創建
在data目錄下新建myid文件。內容為1
3、修改zoo.cfg文件
cd /usr/local/zookeeper/zookeeper1/conf/cp zoo_sample.cfg zoo.cfg
進行過上面兩步之后,有zoo.cfg文件了,現在修改內容為:
dataDir=/usr/local/zookeeper/zookeeper1/datadataLogDir=/usr/local/zookeeper/zookeeper1/logs
server.1=192.168.233.11:2888:3888
server.2=192.168.233.11:2889:3889
server.3=192.168.233.11:2890:3890
4、搭建zookeeper2
首先,復制改名。
cd /usr/local/zookeeper/cp -r zookeeper1 zookeeper2
然后修改具體的某些配置:
vim zookeeper2/conf/zoo.cfg將下圖三個地方1改成2
vim zookeeper2/data/myid同時將myid中的值改成2
5、搭建zookeeper3
同上,復制改名
cp -r zookeeper1 zookeeper3vim zookeeper3/conf/zoo.cfg修改為3
vim zookeeper3/data/myid修改為3
6、測試zookeeper集群
cd /usr/local/zookeeper/zookeeper1/bin/由于啟動所需代碼比較多,這里簡單寫了一個啟動腳本:
vim startstart的內容如下
cd /usr/local/zookeeper/zookeeper1/bin/./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper2/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper3/bin/
./zkServer.sh start ../conf/zoo.cfg
下面是連接腳本:
vim loginlogin內容如下:
./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183腳本編寫完成,接下來啟動:
sh startsh login
啟動集群成功,如下圖:
這里zookeeper就告一段落了,由于zookeeper占用著輸入窗口,這里可以在xshell右鍵標簽,新建ssh渠道。然后就可以在新窗口繼續操作kafka了!
三、搭建kafka集群
1、下載kafka
首先創建kafka目錄:
mkdir /usr/local/kafka然后在該目錄下載
cd /usr/local/kafka/wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
下載成功之后解壓:
tar -zxvf kafka_2.11-1.1.0.tgz2、修改集群配置
首先進入conf目錄下:
cd /usr/local/kafka/kafka_2.11-1.1.0/config修改server.properties
修改內容:
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://192.168.233.11:9092
復制兩份server.properties
cp server.properties server2.propertiescp server.properties server3.properties
修改server2.properties
vim server2.properties修改主要內容為:
broker.id=1log.dirs=/tmp/kafka-logs1
listeners=PLAINTEXT://192.168.233.11:9093
如上,修改server3.properties
修改內容為:
log.dirs=/tmp/kafka-logs2
listeners=PLAINTEXT://192.168.233.11:9094
3、啟動kafka
這里還是在bin目錄編寫一個腳本:
cd ../bin/vim start
腳本內容為:
./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &
通過jps命令可以查看到,共啟動了3個kafka。
4、創建Topic
cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
kafka打印了幾條日志
在啟動的zookeeper中可以通過命令查詢到這條topic!
查看kafka狀態
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic可以看到此時有三個節點 1 , 2 , 0
Leader 是1 ,
因為分區只有一個 所以在0上面,
Replicas:主從備份是 1,2,0,
ISR(in-sync):現在存活的信息也是 1,2,0
5、啟動生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic由于不能按刪除,不能按左右鍵去調整,所以語句有些亂啊。em…
6、啟動消費者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic可以看出,啟動消費者之后就會自動消費。
在生產者又造了一條。
消費者自動捕獲成功!
四、集成springboot
先貼一張kafka兼容性目錄:
不滿足的話啟動springboot的時候會拋異常的!!!ps:該走的岔路我都走了o(╥﹏╥)o
(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中間那列暫時不用管)
回歸正題,搞了兩個小時,終于搞好了,想哭…
遇到的問題基本就是jar版本不匹配。
上面的步驟我也都會相應的去修改,爭取大家按照本教程一遍過!!!
1、pom文件
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.1.1.RELEASEversion>
<relativePath/>
parent>
<groupId>com.gzkygroupId>
<artifactId>studyartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>studyname>
<description>Demo project for Spring Bootdescription>
<properties>
<java.version>1.8java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
<exclusions>
<exclusion>
<groupId>org.junit.vintagegroupId>
<artifactId>junit-vintage-engineartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-redisartifactId>
<version>1.3.8.RELEASEversion>
dependency>
<dependency>
<groupId>redis.clientsgroupId>
<artifactId>jedisartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.2.0.RELEASEversion>
dependency>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
pom文件中,重點是下面這兩個版本。
<parent><groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.1.1.RELEASEversion>
<relativePath/>
parent>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.2.0.RELEASEversion>
dependency>
2、application.yml
spring:redis:
cluster:
#設置key的生存時間,當key過期時,它會被自動刪除;
expire-seconds: 120
#設置命令的執行時間,如果超過這個時間,則報錯;
command-timeout: 5000
#設置redis集群的節點信息,其中namenode為域名解析,通過解析域名來獲取相應的地址;
nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006
kafka:
# 指定kafka 代理地址,可以多個
bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默認消費者group id
group-id: test-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
server:
port: 8085
servlet:
#context-path: /redis
context-path: /kafka
沒有配置Redis的可以把Redis部分刪掉,也就是下圖:
想學習配置Redis集群的可以參考:《Redis集群redis-cluster的搭建及集成springboot》
3、生產者
package com.gzky.study.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* kafka生產者工具類
*
* @author biws
* @date 2019/12/17
**/
@Component
public class KfkaProducer {
private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);
@Autowired
private KafkaTemplate kafkaTemplate;/**
* 生產數據
* @param str 具體數據
*/public void send(String str) {
logger.info("生產數據:" + str);
kafkaTemplate.send("testTopic", str);
}
}
4、消費者
package com.gzky.study.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* kafka消費者監聽消息
*
* @author biws
* @date 2019/12/17
**/
@Component
public class KafkaConsumerListener {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
@KafkaListener(topics = "testTopic")
public void onMessage(String str){
//insert(str);//這里為插入數據庫代碼
logger.info("監聽到:" + str);
System.out.println("監聽到:" + str);
}
}
5、對外接口
package com.gzky.study.controller;import com.gzky.study.utils.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* kafka對外接口
*
* @author biws
* @date 2019/12/17
**/
@RestController
public class KafkaController {
@Autowired
KfkaProducer kfkaProducer;
/**
* 生產消息
* @param str
* @return
*/
@RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)
@ResponseBody
public boolean sendTopic(@RequestParam String str){
kfkaProducer.send(str);
return true;
}
}
6、postman測試
這里首先應該在服務器啟動監聽器(kafka根目錄),下面命令必須是具體的服務器ip,不能是localhost,是我踩過的坑:
推薦此處重啟一下集群
關閉kafka命令:
./kafka-server-stop.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server2.properties &
./kafka-server-stop.sh ../config/server3.properties &
此處應該jps看一下,等待所有的kafka都關閉(關不掉的kill掉),再重新啟動kafka:
./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &
等待kafka啟動成功后,啟動消費者監聽端口:
cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic
曾經我亂輸的測試信息全部被監聽過來了!
啟動springboot服務
然后用postman生產消息:
然后享受成果,服務器端監聽成功。
項目中也監聽成功!
點個在看?你最好看
往期推薦騰訊強推Redis成長手冊!原理+應用+集群+拓展+源碼五飛
阿里要求其程序員必須精通的并發編程筆記:原理+模式+應用
支付寶阿牛整合Netty+Redis+ZK「終極」高并發手冊
餓了么架構師發布“絕版”Java并發實現原理:JDK源碼剖析
不吹不擂,10年架構師公開分享SQL工作筆記,5字真言總結到位
由淺入深吃透容器云+微服務+K8S+MQ+阿里云內部實施手冊
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的kafka 怎么样连接图形化界面_从零开始搭建Kafka+SpringBoot分布式消息系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: httplib java_httplib
- 下一篇: resnet50结构_无需额外数据、Tr