ELK+kafka集群实战
ELK+Kafka集群
前言
前言
業務層可以直接寫入到kafka隊列中,不用擔心elasticsearch的寫入效率問題。
圖示
Kafka
Apache kafka是消息中間件的一種,是一種分布式的,基于發布/訂閱的消息系統。能實現一個為處理實時數據提供一個統一、高吞吐、低延遲的平臺,且擁有分布式的,可劃分的,冗余備份的持久性的日志服務等特點。
術語
1、kafka是一個消息隊列服務器。kafka服務稱為broker(中間人), 消息發送者稱為producer(生產者), 消息接收者稱為consumer(消費者);通常我們部署多個broker以提供高可用性的消息服務集群.典型的是3個broker;消息以topic的形式發送到broker,消費者訂閱topic,實現按需取用的消費模式;創建topic需要指定replication-factor(復制數目, 通常=broker數目);每個topic可能有多個分區(partition), 每個分區的消息內容不會重復
2、kafka-broker-中間人
3、webserver/logstash-producer[pr??du:s??]-消息生產者/消息發送者
Producer:
kafka集群中的任何一個broker都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/“partitions leader列表"等信息;
當producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接;
消息由producer直接通過socket發送到broker,中間不會經過任何"路由層”,事實上,消息被路由到哪個partition上由producer客戶端決定;比如可以采用"random"“key-hash”"輪詢"等,如果一個topic中有多個partitions,那么在producer端實現"消息均衡分發"是必要的。
在producer端的配置文件中,開發者可以指定partition路由的方式。
Producer消息發送的應答機制設置發送數據是否需要服務端的反饋,有三個值0,1,-1
0:producer不會等待broker發送ack
1:當leader接收到消息之后發送ack
-1:當所有的follower都同步消息成功后發送ack
4、elasticsearch-consumer-消費者
5、logs-topic-話題
6、replication-facter-復制數目-中間人存儲消息的副本數=broker數目
7、一個topic有多個分區partition
partition:
(1)、Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。
(2)、在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻);一個Topic中的每個partions,只會被一個consumer消費,不過一個consumer可以同時消費多個partitions中的消息。
實戰
拓撲
說明
說明
1、使用一臺Nginx代理訪問kibana的請求;
2、兩臺es組成es集群,并且在兩臺es上面都安裝kibana;( 以下對elasticsearch簡稱es )
3、中間三臺服務器就是我的kafka(zookeeper)集群啦; 上面寫的 消費者/生產者 這是kafka(zookeeper)中的概念;
4、最后面的就是一大堆的生產服務器啦,上面使用的是logstash,
當然除了logstash也可以使用其他的工具來收集你的應用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……
角色
1、nginx-proxy(略):192.168.0.109
2、es1:192.168.0.110
3、es2:192.168.0.111
4、kafka1:192.168.0.112
5、kafka2:192.168.0.113
6、kafka3:192.168.0.115
7、webserver:192.168.0.114
軟件說明
1、elasticsearch - 1.7.3.tar.gz
2、Logstash - 2.0.0.tar.gz
3、kibana - 4.1.2 - linux - x64 . tar . gz(略):
以上軟件都可以從官網下載 : https : //www.elastic.co/downloads
4、java - 1.8.0 , nginx 采用 yum 安裝
步驟
1、ES集群安裝配置;
2、Logstash客戶端配置(直接寫入數據到ES集群,寫入系統messages日志);
3、Kafka(zookeeper)集群配置;(Logstash寫入數據到Kafka消息系統);
4、Kibana部署;
5、Nginx負載均衡Kibana請求;
演示
1、ES集群安裝配置
es1:
(1)、安裝java-1.8.0以及依賴包(每臺服務器都安裝JAVA)
(2)、獲取es軟件包
# wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz # tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local/ # ln -sv /usr/local/elasticsearch-1.7.3/ /usr/local/elasticsearch/(3)、修改配置文件
# vim /usr/local/elasticsearch/config/elasticsearch.ymlcluster.name: es-cluster #組播的名稱地址 node.name: "es-node1" #節點名稱,不能和其他節點重復 node.master: true #節點能否被選舉為master node.data: true #節點是否存儲數據 index.number_of_shards: 5 #索引分片的個數 index.number_of_replicas: 1 #分片的副本個數 path.conf: /usr/local/elasticsearch/config #配置文件的路徑 path.data: /data/es/data #數據目錄路徑 path.work: /data/es/worker #工作目錄路徑 path.logs: /usr/local/elasticsearch/logs #日志文件路徑 path.plugins: /data/es/plugins #插件路徑 bootstrap.mlockall: true #內存不向swap交換 http.enabled: true #啟用http(4)、創建相關目錄
# mkdir -p /data/es/{data,worker,plugins} #注釋:data:放數據的文件 worker:工作臨時文件 plugins:插件 日志文件會自己建立(5)、獲取es服務管理腳本
#為了方便配置文件 # git clone https://github.com/elastic/elasticsearch-servicewrapper.git# mv elasticsearch-servicewrapper/service/ /usr/local/elasticsearch/bin/#在/etc/init.d/目錄下,自動安裝上es的管理腳本啦 # /usr/local/elasticsearch/bin/service/elasticsearch install(6)、啟動es,并檢查服務是否正常
# systemctl start elasticsearch # systemctl enable elasticsearch # ss -nptl |grep -E "9200|9300" LISTEN 0 50 :::9200 :::* users:(("java",pid=10020,fd=104)) LISTEN 0 50 :::9300 :::* users:(("java",pid=10020,fd=66))訪問192.168.88.153:9200
es2:
(1)、安裝java-1.8.0以及依賴包
(2)、把目錄復制到es2
es1:
es2:
# ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch # /usr/local/elasticsearch/bin/service/elasticsearch install # vim /usr/local/elasticsearch/config/elasticsearch.yml 40 node.name: "es-node2" #節點名稱,不能和其他節點重復 # systemctl start elasticsearch # systemctl enable elasticsearch(3)、安裝es的管理插件
(1)、說明:es官方提供一個用于管理es的插件,可清晰直觀看到es集群的狀態,以及對集群的操作管理,安裝方法如下:
(2)、提示:
安裝好之后,訪問方式為: http://192.168.0.110:9200/_plugin/head,
由于集群中現在暫時沒有數據,所以顯示為空,
至此,es集群的部署完成。
2、Logstash客戶端安裝配置(在webserver1上安裝logstassh,用于采集日志)
(1)、downloads 軟件包
(2)、Logstash 向es集群寫數據
編寫配置文件;
檢查配置文件是否有語法錯誤:
# /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf --configtest --verbose輸出提示: Configuration OK 語法正確(3)、啟動logstash
啟動;
輸出提示;
查看es集群的head插件:
查看es集群head插件數據瀏覽:
(3)、演示如何收集系統日志(測試)
編寫配置文件;
啟動logstash:
# /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash2.conf查看es集群head插件:
查看es集群head插件數據瀏覽:
3、Kafka集群安裝配置
(1)、提示:在搭建kafka集群時,需要提前安裝zookeeper集群,當然kafka已經自帶zookeeper程序只需要解壓并且安裝配置就行了
(2)、Kafka1
獲取軟件包:
配置zookeeper集群:
# vim /usr/local/kafka/config/zookeeper.propertiesdataDir=/data/zookeeper clientPort=2181 tickTime=2000 #注釋:tickTime : 這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。 initLimit=20 #注釋:initLimit:LF初始通信時限 #集群中的follower服務器(F)與leader服務器(L)之間 初始連接 時能容忍的最多心跳數(tickTime的數量)。 #此配置表示,允許 follower (相對于 leader 而言的“客戶端”)連接 并同步到 leader 的初始化連接時間,它以 tickTime 的倍數來表示。當超過設置倍數的 tickTime 時間,則連接失敗 syncLimit=10 #注釋:syncLimit:LF同步通信時限 #集群中的follower服務器(F)與leader服務器(L)之間 請求和應答 之間能容忍的最多心跳數(tickTime的數量)。 #此配置表示, leader 與 follower 之間發送消息,請求 和 應答 時間長度。如果 follower 在設置的時間內不能與leader 進行通信,那么此 follower 將被丟棄。 server.2=192.168.0.112:2888:3888 #注釋: #2888 端口:表示的是這個服務器與集群中的 Leader 服務器交換信息的端口; #3888 端口:表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader ,而這個端口就是用來執行選舉時服務器相互通信的端口。 server.3=192.168.0.113:2888:3888 server.4=192.168.0.115:2888:3888 maxClientCnxns=0 #注釋: #maxClientCnxns選項,如果不設置或者設置為0,則每個ip連接zookeeper時的連接數沒有限制創建zookeeper所需要的目錄:
# mkdir /data/zookeeper創建myid文件:
#在/data/zookeeper目錄下創建myid文件,里面的內容為數字,用于標識主機,如果這個文件沒有的話,zookeeper是沒法啟動的 # echo 2 > /data/zookeeper/myid #另外兩臺,分別是3,4。以上就是zookeeper集群的配置,下面等我配置好kafka之后直接復制到其他兩個節點即可Kafka配置:
# vim /usr/local/kafka/config/server.properties broker.id=2 #注釋:唯一,填數字,本文中分別為 2 / 3 / 4 prot=9092 #注釋:這個 broker 監聽的端口 host.name=192.168.0.112 #注釋:唯一,填服務器 IP log.dir=/data/kafka-logs #注釋:該目錄可以不用提前創建,在啟動時自己會創建 zookeeper.connect=192.168.0.112:2181,192.168.0.113:2181,192.168.0.115:2181 #注釋:這個就是 zookeeper 的 ip 及端口 num.partitions=16 #注釋:需要配置較大 分片影響讀寫速度 log.dirs=/data/kafka-logs #注釋:數據目錄也要單獨配置磁盤較大的地方 log.retention.hours=168 #注釋:時間按需求保留過期時間 避免磁盤滿 #日志保存的時間,可以選擇hours,minutes和ms 168(7day)(3)、Kafka3&Kafka3
將Kafka(zookeeper)的程序目錄全部拷貝到其他兩個節點:
修改兩個節點配置(這里基本相同,但是注意不同位置):
( 1 ) zookeeper 的配置 #kafka2和kafka3 # mkdir /data/zookeeper -p # echo "x" > /data/zookeeper/myid #x為3或4 ( 2 ) kafka 的配置 #kafka2和kafka3 # vim /usr/local/kafka/config/server.properties broker.id=x #x為3或4 host.name=192.168.x.xx #x為Kafka2和Kafka3的ip啟動,先啟動zookeeper集群,才能啟動Kafka
8.啟動,先啟動zookeeper集群,才能啟動kafka #提示:按照順序來,kafka1 –> kafka2 –>kafka3 #zookeeper啟動命令: # /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & # /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & # /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #注釋:/usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper停止的命令 #如果zookeeper有問題 nohup的日志文件會非常大,把磁盤占滿, #這個zookeeper服務可以通過自己些服務腳本來管理服務的啟動與關閉。 #后面兩臺執行相同操作,在啟動過程當中會出現以下報錯信息: #WARN Cannot open channel to 3 at election address / 192.168.0.113 : 3888 #由于zookeeper集群在啟動的時候,每個結點都試圖去連接集群中的其它結點,先啟動的肯定連不上后面還沒啟動的,所以上面日志前面部分的異常是可以忽略的。通過后面部分可以看到,集群在選出一個Leader后,最后穩定了。 #其他節點也可能會出現類似的情況,屬于正常。zookeeper服務檢查:
# netstat -nlpt | grep -E "2181|2888|3888" #輸出提示: tcp6 0 0 192.168.0.112:3888 :::* LISTEN 1722/java tcp6 0 0 :::2181 :::* LISTEN 1722/java # netstat -nlpt | grep -E "2181|2888|3888" #輸出提示 tcp6 0 0 192.168.0.113:3888 :::* LISTEN 1713/java tcp6 0 0 :::2181 :::* LISTEN 1713/java tcp6 0 0 192.168.0.113:2888 :::* LISTEN 1713/java #如果哪臺是Leader,那么它就擁有2888這個端口 # netstat -nlpt | grep -E "2181|2888|3888" #輸出提示 tcp6 0 0 192.168.0.115:3888 :::* LISTEN 1271/java tcp6 0 0 :::2181 :::* LISTEN 1271/java驗證、啟動Kafka:
#啟動(在三臺Kafka服務器上): # nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & # nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & # nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #注釋:/usr/local/kafka/bin/kafka-server-stop.sh #kafka停止的命令 #跟zookeeper服務一樣,如果kafka有問題 nohup的日志文件會非常大,把磁盤占滿,這個kafka服務同樣可以通過自己些服務腳本來管理服務的啟動與關閉。 #測試: #下面我們將webserver1上面的logstash的輸出改到kafka上面 (1)修改webserver1上面的logstash配置 # vim /usr/local/logstash/etc/logstash3.conf input { file { type => "system-message" path => "/var/log/messages" start_position => "beginning" } } output { kafka { bootstrap_servers => "192.168.0.112:9092,192.168.0.113:9092,192.168.0.115:9092" topic_id => "system-messages" #這個將作為主題的名稱,將會自動創建 compression_type => "snappy" #壓縮類型 } }(2)配置檢測 # /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash3.conf --configtest --verbose #輸出提示: Configuration OK(3)啟動Logstash # /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash3.conf (4)驗證數據是否寫入到kafka,檢查是否生成一個system-messages的主題 # /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.0.113:2181 #輸出信息 summer system - messages #可以看到這個主題已經生成了(5)查看system-messages主題的詳情 # /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.0.113:2181 --topic system-messages輸出信息:
可以看出,這個主題生成了16個分區,每個分區都有對應自己的Leader
擴展提示:
我想要有10個分區,3個副本如何辦?還是跟我們上面一樣命令行來創建主題就行,
當然對于logstash輸出的我們也可以提前先定義主題,然后啟動logstash 直接往定義好的主題寫數據就行啦,命令如下:
Kafka集群部署logstash:
1、kafka 1&2&3安裝logstash: # wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz # tar -xf logstash-2.0.0.tar.gz -C /usr/local # cd /usr/local/ # ln -sv logstash-2.0.0 logstash # mkdir /usr/local/logstash/{logs,etc}2、三臺kafak編寫logstash配置文件 # vim /usr/local/logstash/etc/logstash.conf #2181后面不能有空格 #decorate_events => true:此屬性會將當前topic、offset、group、partition等信息也帶到message中 input { kafka { zk_connect => "192.168.0.112:2181,192.168.0.113:2181,192.168.0.115:2181" #消費者們 topic_id => "system-messages" codec => plain reset_beginning => false consumer_threads => 5 decorate_events => true } } output { elasticsearch { hosts => ["192.168.0.110:9200","192.168.0.111:9200"] index => "test-system-messages-%{+YYYY-MM}" #區分之前實驗,新名字“test-system-messages-%{+YYYY-MM}” } }3、webserver1上寫入測試內容 # echo "00000">/var/log/messages # echo "我將通過kafka集群達到es集群1234" >> /var/log/messages # /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash3.conf 4、三臺kafka啟動logstash(注意順序1>2>3) # /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf # /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf # /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/logstash.conf查看es管理界面:
總結;
4、擴展(Nginx負載均衡Kibana的請求)
(1)、在nginx-proxy上面yum安裝nginx
(2)、編寫配置問佳佳es.conf
# vim /etc/nginx/conf.d/es.conf upstream es { server 192.168.0.110:5601 max_fails=3 fail_timeout=30s; server 192.168.0.111:5601 max_fails=3 fail_timeout=30s; }server {listen 80;server_name localhost;location / {proxy_pass http://es/;index index.html index.htm;#authauth_basic "ELK Private";auth_basic_user_file /etc/nginx/.htpasswd;} }(3)、創建認證
3.創建認證 # htpasswd -cm /etc/nginx/.htpasswd elk New password: Re-type new password: Adding password for user elk-user # /etc/init.d/nginx restart Stopping nginx: [ OK ] Starting nginx: [ OK ](4)、訪問
總結
消息系統主要功能
1、解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束
2、冗余
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3、擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4、靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。
如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5、可恢復性
系統的一部分組件失效時,不會影響到整個系統。
消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6、順序保證
在大多使用場景下,數據處理的順序都很重要。
大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)
7、緩沖
有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8、異步通信
很多時候,用戶不想也不需要立即處理消息。
消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
Redis與Kafka
我們都知道Redis是以key的hash方式來分散對列存儲數據的,且Redis作為集群使用時,對應的應用對應一個Redis,在某種程度上會造成數據的傾斜性,從而導致數據的丟失。
而從之前我們部署Kafka集群來看,kafka的一個topic(主題),可以有多個partition(副本),而且是均勻的分布在Kafka集群上,這就不會出現redis那樣的數據傾斜性。Kafka同時也具備Redis的冗余機制,像Redis集群如果有一臺機器宕掉是很有可能造成數據丟失,而Kafka因為是均勻的分布在集群主機上,即使宕掉一臺機器,是不會影響使用。同時Kafka作為一個訂閱消息系統,還具備每秒百萬級別的高吞吐量,持久性的、分布式的特點等。
總結
以上是生活随笔為你收集整理的ELK+kafka集群实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mondrian + JPivot环境配
- 下一篇: GDAL环境配置