python连接es数据库_Python Elasticsearch API操作ES集群
環(huán)境Centos 7.4
Python 2.7
Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
Elasitcsearch6.3.2
知識(shí)點(diǎn)調(diào)用Python Elasticsearh API
Python Mysqldb使用
DSL查詢與聚合
Python 列表操作
代碼
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#minyt 2018.9.1
#獲取24小時(shí)內(nèi)出現(xiàn)的模塊次數(shù)
# 該程序通過elasticsearch python client 獲取相關(guān)精簡數(shù)據(jù),可以計(jì)算請求數(shù)、超時(shí)數(shù)、錯(cuò)誤數(shù)、正確率、錯(cuò)誤率等等
import MySQLdb
from elasticsearch import Elasticsearch
from elasticsearch import helpers
#定義elasticsearch集群索引名
index_name = "logstash-nginxlog-*"
#實(shí)例化Elasticsearch類,并設(shè)置超時(shí)間為180秒,默認(rèn)是10秒的,如果數(shù)據(jù)量很大,時(shí)間設(shè)置更長一些
es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180)
#DSL(領(lǐng)域特定語言)查詢語法,查詢top50 sname的排列次數(shù)
data_sname = {
"aggs": {
"2": {
"terms": {
"field": "apistatus.sname.keyword",
"size": 100,
"order": {
"_count": "desc"
}
}
}
},
"size": 0,
"_source": {
"excludes": []
},
"stored_fields": [
"*"
],
"script_fields": {},
"docvalue_fields": [
"@timestamp"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
},
{
"range": {
"@timestamp": {
"gte" : "now-24h/h",
"lt" : "now/h"
}
}
}
],
"filter": [],
"should": [],
"must_not": []
}
}
}
#按照DSL(特定領(lǐng)域語言)語法查詢獲取數(shù)據(jù)
def get_original_data():
try:
#根據(jù)上面條件搜索數(shù)據(jù)
res = es.search(
index=index_name,
size=0,
body=data_sname
)
return res
except:
print "get original data failure"
#初始化數(shù)據(jù)庫
def init_mysql():
# 打開數(shù)據(jù)庫連接
db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )
# 使用cursor()方法獲取操作游標(biāo)
cursor = db.cursor()
# SQL 更新語句
sql = "update appname set count=0"
try:
# 執(zhí)行SQL語句
cursor.execute(sql)
# 提交到數(shù)據(jù)庫執(zhí)行
db.commit()
except:
# 發(fā)生錯(cuò)誤時(shí)回滾
db.rollback()
# 關(guān)閉數(shù)據(jù)庫連接
db.close()
def updata_mysql(sname_count,sname_list):
# 打開數(shù)據(jù)庫連接
db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )
# 使用cursor()方法獲取操作游標(biāo)
cursor = db.cursor()
# SQL 更新語句
sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list)
try:
# 執(zhí)行SQL語句
cursor.execute(sql)
# 提交到數(shù)據(jù)庫執(zhí)行
db.commit()
except:
# 發(fā)生錯(cuò)誤時(shí)回滾
db.rollback()
# 關(guān)閉數(shù)據(jù)庫連接
db.close()
#根據(jù)Index數(shù)據(jù)結(jié)構(gòu)通過Elasticsearch Python Client上傳數(shù)據(jù)到新的Index
def import_process_data():
try:
#列表形式顯示結(jié)果
res = get_original_data()
#print res
res_list = res.get('aggregations').get('2').get('buckets')
#print res_list
#初始化數(shù)據(jù)庫
init_mysql()
#獲取24小時(shí)內(nèi)出現(xiàn)的SNAME
for value in res_list:
sname_list = value.get('key')
sname_count = value.get('doc_count')
print sname_list,sname_count
#更新sname_status值
updata_mysql(sname_count,sname_list)
except Exception, e:
print repr(e)
if __name__ == "__main__":
import_process_data()
總結(jié)
關(guān)鍵是DSL語法的編寫涉及查詢與聚合可以通過kibana的visualize或者devtool先測試出正確語法,然后結(jié)合python對(duì)列表、字典、除法、字符串等操作即可。下面匯總下各個(gè)算法:超長請求
http_host.keyword: http://api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*錯(cuò)誤
錯(cuò)誤請求
apistatus.status.keyword:*錯(cuò)誤 AND (http_host.keyword: http://api.mydomain.com OR http_host.keyword: http://api.yourdomain.com )
請求健康度
域名與request_time聚合,域名請求時(shí)間小于3秒的次數(shù)除以總請求次數(shù)對(duì)應(yīng)各個(gè)域名健康度
請求正確率
域名與http狀態(tài)碼聚合,域名http狀態(tài)碼為200的次數(shù)除以域名總請求數(shù)對(duì)應(yīng)各個(gè)域名的請求正確率
更多精彩內(nèi)容,請滑至頂部點(diǎn)擊右上角關(guān)注小宅哦~
來源:51CTO 作者 minminmsn
總結(jié)
以上是生活随笔為你收集整理的python连接es数据库_Python Elasticsearch API操作ES集群的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据传输完整性_生产系统数据完整性事件常
- 下一篇: 生鲜配送小程序源码_生鲜果蔬配送小程序开