python连接mysql代码_python连接mysql
importjsonimportrequestsimportmysql.connectorimportdatetimefrom configparser importConfigParserimporttracebackdef sendmessage(url, msg, at_list=[]):"""給釘釘發消息"""HEADERS={"Content-Type": "application/json ;charset=utf-8"}#String_textMsg = {
#"msgtype": "markdown",
#"markdown": mdmsg,
#"at": {
#"atMobiles": at_list,
#"isAtAll": 0 # 如果需要@所有人,這些寫1
#}
#}
String_textMsg ={"msgtype": "text","text": {"content": msg},"at": {"atMobiles": at_list, #["10086"]
"isAtAll": 0 #如果需要@所有人,這些寫1
}
}
String_textMsg=json.dumps(String_textMsg)
res= requests.post(url, data=String_textMsg, headers=HEADERS)print(str(datetime.datetime.now()) + "發送釘釘消息:" +str(res.text))def query_sql(dingding_url, mysql_conn, table, duration_threshold=70000, at_list=[]):
sqlPattern= r"select duration,query_sql,type,timestamp from {} where timestamp > '{}' order by timestamp"now_time=datetime.datetime.now()
pre_time= now_time - datetime.timedelta(hours=1)
pre_time_str= pre_time.strftime("%Y-%m-%d %H:%M:%S")
alarm_list=[]#獲取上次查詢時間
with open("/tmp/pandora-sql-monitoring.time", "r") as file:
pre_time_local=file.readline()ifpre_time_local:
pre_time_str=pre_time_local#構建sql
sql =sqlPattern.format(table, pre_time_str)print(sql)
last_query_time=None#查詢sql
try:
cursor= mysql_conn.cursor(dictionary=True)
cursor.execute(sql)
result=cursor.fetchall()
mysql_conn.commit()
cursor.close()for row inresult:
last_query_time= str(row["timestamp"])if int(row["duration"]) >=int(duration_threshold):
alarm_list.append(row)exceptException as e:
traceback.print_exc()#更新最近一次查詢的時間
iflast_query_time:
with open("/tmp/pandora-sql-monitoring.time", "w+") as file:
file.write(last_query_time)#發送釘釘消息
index =0for item inalarm_list:
msg= ""msg+= "耗時:"+str(item["duration"]/1000)+"秒\n"msg+= "時間:"+str(item["timestamp"])+"\n"msg+= "sql:"+str(item["query_sql"])+"\n"sendmessage(dingding_url[index%len(dingding_url)], msg, at_list)
index= index+1
if __name__ == '__main__':#讀取配置文件獲取druid的請求url
cp =ConfigParser()
cp.read("properties.cfg")#讀取mysql配置
mysql_host = cp.get("mysql", "host")
mysql_port= cp.get("mysql", "port")
mysql_user= cp.get("mysql", "user")
mysql_password= cp.get("mysql", "password")
mysql_db= cp.get("mysql", "database")
mysql_table= cp.get("mysql", "table")
mysql_conn= mysql.connector.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_password, database=mysql_db)#多個機器人 為了解決釘釘的消息發送數量限制,輪流發
dingding_url =[]
dingding_url.append(cp.get("dingding", "url1"))
at_list= ["10086"]#讀取alarm相關信息
duration_threshold = cp.get("alarm", "duration_threshold")
query_sql(dingding_url, mysql_conn, mysql_table, duration_threshold, at_list)
總結
以上是生活随笔為你收集整理的python连接mysql代码_python连接mysql的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nodejs 面向对象 私有变量_Jav
- 下一篇: 双线性内插怎么缩小_汗蒸桶怎么选择才是最