使用开源工具ELK可视化 Azure NSG日志
國內的Azure最近上線了網絡觀察程序服務,可以幫助用戶監控和分析VNET虛擬網絡。其中一個很重要的功能就是可以記錄NSG的安全訪問日志了。但是如果用戶設置了NSG流日志,并下載日志想要分析一下的話,會發現日志其實并不是很友好,NSG流日志是以json格式記錄的,可以看到的內容大致如下圖所示:日志會記錄NSG規則名,系統時間,源地址,目的地址,源端口,目的端口,協議類型,流量方向和處理規則。
不過所有的記錄都連在一起,如果要查找某個具體的安全訪問記錄,非常困難。
?
不過我們可以使用多個多種開源工具將相關日志數據可視化。最近正好有客戶希望可以使用比較流行的ELK套件來分析NSG安全記錄。在參考了微軟和?Elastic.co的官方文檔,以及同事的文章后,終于實現了使用ELK套件獲取并分析Azure.cn的NSG日志,特此記錄一下過程和注意事項。
簡單介紹一下ELK,ELK是 Elasticsearch、Logstash、Kibana 三個開源軟件的組合。在實時數據檢索和分析場合,三者通常是配合共用,而且又都先后歸于 Elastic.co 公司名下,故此簡稱ELK。它具有如下幾個優點:
- 處理方式靈活。Elasticsearch 是實時全文索引,不需要預先編程才能使用;
- 配置簡易上手。Elasticsearch 全部采用 JSON 接口,Logstash 是 Ruby DSL 設計,都是目前業界最通用的配置語法設計;
- 檢索性能高效。雖然每次查詢都是實時計算,但是優秀的設計和實現基本可以達到全天數據查詢的秒級響應;
- 集群線性擴展。不管是 Elasticsearch 集群還是 Logstash 集群都是可以線性擴展的;
- 前端操作炫麗。Kibana 界面上,只需要點擊鼠標,就可以完成搜索、聚合功能,生成炫麗的儀表板。
?
要部署ELK,我們現在Azure上部署一臺Centos 7.X的虛機,并把這臺虛機的NSG作為日志來源。
通過以下步驟開啟NSG流日志:
?在Azure的服務里找到“網絡觀察程序”服務。
?
在網絡觀察程序里,選中NSG流日志:
?
在右側用資源組過濾,找到需要分析的NSG。
選中NSG,開啟NSG流日志,指定日志存放的存儲賬號和日志保留時間。
?
完成了Azure NSG流日志的設置,我們就可以開始安裝部署ELK,來獲取數據并進行分析了。
?
1、通過SSH登錄到剛才建立的Centos虛機。先安裝Java運行環境:
?yum install java-1.8.0-openjdk
?
2、下載可信簽名證書:
?rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
?
3、創建 Elasticsearch 的yum repo文件
?vi /etc/yum.repos.d/elasticsearch.repo
?輸入以下內容
?[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
?
4、安裝Elasticsearch包
?yum install elasticsearch
?
5、修改配置參數,允許外部訪問
?vi /etc/elasticsearch/ elasticsearch.yml
?
找到network.host參數,設置為0.0.0.0
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 0.0.0.0
#
# Set a custom port for HTTP:
#
http.port: 9200
#
?
6、啟動Elasticsearch并設置系統啟動
?systemctl enable elasticsearch.service
?systemctl start elasticsearch.service?
?
7、創建 Logstash的yum repo文件
?vi /etc/yum.repos.d/logstash.repo
?
輸入以下內容
?[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
?
8、安裝Logstash包
?yum install logstash
?
9、安裝讀取Azure Blob存儲的插件
?/usr/share/logstash/bin/logstash-plugin install logstash-input-azureblob
?
10、設置存儲讀取位置為Azure.cn,默認是讀取WW的Azure
?vi /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-azureblob-0.9.12-java/lib/logstash/inputs/azureblob.rb
?
找到以下內容
config :endpoint, :validate => :string, :default => 'core.windows.net'
?將endpoint修改為中國Azure的endpoint:core.chinacloudapi.cn
config :endpoint, :validate => :string, :default => 'core.chinacloudapi.cn'
?
11、創建Logstash的配置文件
?在Azure門戶中中找到所使用存儲賬號的訪問密鑰
?
?
vi /etc/logstash/conf.d/logstash.conf
輸入以下內容
?input {
azureblob
? {
????? storage_account_name => "此處更改為NSG流日志所使用的存儲賬戶"
????? storage_access_key => "此處更改為存儲賬戶的訪問密鑰"
????? container => "insights-logs-networksecuritygroupflowevent"
????? codec => "json"
????? # Refer https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-read-nsg-flow-logs
????? # Typical numbers could be 21/9 or 12/2 depends on the nsg log file types
????? file_head_bytes => 12
????? file_tail_bytes => 2
????? # Enable / tweak these settings when event is too big for codec to handle.
????? # break_json_down_policy => "with_head_tail"
????? # break_json_batch_count => 2
? }
}
?
filter {
? split { field => "[records]" }
? split { field => "[records][properties][flows]"}
? split { field => "[records][properties][flows][flows]"}
? split { field => "[records][properties][flows][flows][flowTuples]"}
?
mutate{
split => { "[records][resourceId]" => "/"}
add_field => {"Subscription" => "%{[records][resourceId][2]}"
? ????????????"ResourceGroup" => "%{[records][resourceId][4]}"
????????????? "NetworkSecurityGroup" => "%{[records][resourceId][8]}"}
convert => {"Subscription" => "string"}
convert => {"ResourceGroup" => "string"}
convert => {"NetworkSecurityGroup" => "string"}
split => { "[records][properties][flows][flows][flowTuples]" => ","}
add_field => {
??????????? "unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
??????????? "srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
??????????? "destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
??????????? "srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
??????????? "destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
???????? ???"protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
??????????? "trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
??????????? "traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
???????????? }
convert => {"unixtimestamp" => "integer"}
convert => {"srcPort" => "integer"}
convert => {"destPort" => "integer"}???????
}
?
date{
?match => ["unixtimestamp" , "UNIX"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
?hosts => "localhost"
?index => "nsg-flow-logs"
}
}
?
12、啟動Logstash并設置系統啟動
?systemctl enable logstash.service
?systemctl start logstash.service
?
13、創建 Kibana的yum repo文件
?vi /etc/yum.repos.d/kibana.repo
?
輸入以下內容
?[kibana-4.5]
name=Kibana repository for 4.5.x packages
baseurl=http://packages.elastic.co/kibana/4.5/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
?
14、安裝Kibana包
?yum install kibana
?
15、設置Kibana參數,允許外部訪問
?vi /etc/kibana/kibana.yml
?
找到以下server.host參數,修改為0.0.0.0
?# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "0.0.0.0"
?
16、啟動Kibana并設置系統啟動
?systemctl enable kibana.service
?systemctl start kibana.service
?
17、檢查ELK服務狀態
?systemctl status elasticsearch
?
systemctl status logstash
?
systemctl status kibana
?
?
18、設置虛機的NSG策略,允許外部訪問kibana門戶
?
?
?19、登錄Kibana門戶
?http://hospip:5601
?
?
?20、設置state:storeInSessionStorage參數
?在“Management”選項卡下的 Advanced Setting里找到state:storeInSessionStorage參數,并enable
?
?
?21、在“Management”選項卡下,通過“Index Patterns” 創建索引。
?
?
?22、微軟已經很貼心的為大家準備了示例的Dashboard,可以直接導入,以后大家還可以按照自己的需要設計
在此處下載儀表板文件,在此處下載可視化效果文件。
在 Kibana 的“Management”選項卡下,通過“Saved Objects”導入文件。?然后,可從“Dashboard”選項卡打開并加載示例Dashboard。
?
?
?進入Dashboard就可以查看當前設置的一些示例報表
?
在Dashboard的右上角可以設置日志分析的時間段,可以幫助用戶迅速定位到需要分析的時間點
?
?
?
最后感謝康老師的支持。他的文檔給了我很大幫助。
康老師的博客地址:https://www.azure.cn/blog/2017/12/27/AZURE-NSG-FLOW-LOG-Analysis/
Azure官方配置文檔說明:https://docs.microsoft.com/zh-cn/azure/network-watcher/network-watcher-visualize-nsg-flow-logs-open-source-tools
如果需要搭建ELK集群服務,可以參考此文檔:https://docs.azure.cn/zh-cn/articles/training/open-source-azure-virtual-machines-create-elk-cluster
ELK官方文檔:https://www.elastic.co/guide/index.html
Azure存儲插件的說明:https://github.com/Azure/azure-diagnostics-tools/tree/master/Logstash/logstash-input-azureblob
?
當然用戶也可以使用PowerBI服務來分析NSG的流日志,這樣就可以不用自己安裝和部署了。文檔位置:https://docs.microsoft.com/zh-cn/azure/network-watcher/network-watcher-visualize-nsg-flow-logs-power-bi
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的使用开源工具ELK可视化 Azure NSG日志的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 软件测试计划时要记住什么
- 下一篇: 1000万的现金和1000万房产,五年之
