logging.to_files: true
在監(jiān)控端讀取redis數據,并通過正則處理到mysql數據庫。
vi /data/mysql_slowLog.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import json
import pymysql
import re
import time
import threading
# redis connect info
redisHost = 'xxx'
redisPort = 2402
redisDB = '0'
redisKey = 'mysql_slowlog'
# mysql connect info
mysqlHost = 'xxx'
mysqlPort = 2001
# mysqlPort = 23306
mysqlUser = ''
mysqlPasswd = ''
# mysqlPasswd = 'open'
mysqlDB = ''
mysqlTablePrefix = 'mysql_slowlog_'
collectStep = 60
def time_log():
return '[' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ']'
def gather_log(redisConn):
data_list = []
logList = []
keyState = redisConn.exists(redisKey)
if keyState:
logLen = redisConn.llen(redisKey)
if logLen > 0:
redisKeyNew = redisKey + '-bak'
redisConn.renamenx(redisKey, redisKeyNew)
logList = redisConn.lrange(redisKeyNew,0,logLen)
redisConn.delete(redisKeyNew)
else:
pass
else:
pass
if len(logList) > 0:
for item in logList:
data_dict = {}
slowLogJson = json.loads(item)
#print(slowLogJson['message'])
data_dict['hostname'] = slowLogJson['beat']['hostname']
#print(slowLogJson['beat']['hostname'])
data_dict['ip'] = slowLogJson['fields']['ip']
#print(slowLogJson['fields']['ip'])
data_dict['port'] = slowLogJson['fields']['port']
#print(slowLogJson['fields']['port'])
logContent = slowLogJson['message']
#Regex
timeRe = r'# Time: (.*)\n# User@Host:'
userRe = r'# User@Host:.*\[(.*?)\]\s+@ '
hostRe = r'# User@Host: .*\[(.*?)\] Id:'
schemaRe = r'# Schema:\s+(.*?)\s+Last_errno:'
queryRe = r'# Query_time:\s+(.*?)\s+Lock_time:'
locklRe = r'# Query_time:.*?Lock_time:\s+(.*?)\s+Rows_sent:'
rowsRe = r'# Query_time:.*?Lock_time:.*?Rows_sent:\s+(\d+)\s+Rows_examined:'
bytesRe = r'# Bytes_sent:\s+(\d+)'
timestampRe = r'SET\s+timestamp=(.*?);'
commandRe = r'SET\s+timestamp=.*?;\n(.*?)(?=$)'
if re.findall(timeRe, logContent):
data_dict['sys_time'] = u'20' + re.findall(timeRe, logContent)[0]
data_dict['sys_time'] = data_dict['sys_time'][:4] + '-' + data_dict['sys_time'][4:6] + '-' + data_dict['sys_time'][6:]
data_dict['cli_user'] = re.findall(userRe, logContent)[0]
data_dict['cli_ip'] = re.findall(hostRe,logContent)[0]
data_dict['schema'] = re.findall(schemaRe,logContent)[0]
data_dict['query_time'] = re.findall(queryRe,logContent)[0]
data_dict['lock_time'] = re.findall(locklRe,logContent)[0]
data_dict['rows_sent'] = re.findall(rowsRe,logContent)[0]
data_dict['bytes_sent'] = re.findall(bytesRe,logContent)[0]
data_dict['timestamp'] = re.findall(timestampRe,logContent)[0]
data_dict['command'] = re.findall(commandRe,logContent,re.M)[0]
data_list.append(data_dict)
else:
pass
#print('Not slowlog data')
else:
pass
#print('No data')
return data_list
def send_data(data,mysql_pool):
mysqlTableDate = time.strftime('%Y%m', time.localtime(time.time()))
mysqlTable = mysqlTablePrefix + mysqlTableDate
cursor = mysql_pool.cursor()
data_list = []
createTableSql = "create table mysql_slowlog_000000 (`id` int(11) NOT NULL AUTO_INCREMENT," \
"hostname varchar(64) NOT NULL," \
"ip varchar(20) NOT NULL," \
"port int(11) NOT NULL," \
"sys_time datetime NOT NULL," \
"cli_user varchar(32) NOT NULL," \
"cli_ip varchar(32) NOT NULL," \
"`schema` varchar(32) NOT NULL," \
"query_time float(6,3) NOT NULL," \
"lock_time float(6,3) NOT NULL," \
"rows_sent int(11) NOT NULL," \
"bytes_sent int(11) NOT NULL," \
"`timestamp` varchar(40) NOT NULL," \
"command varchar(2048) DEFAULT NULL," \
"PRIMARY KEY (`id`)," \
"KEY `idx_slowlog_000000_user` (`cli_user`)," \
"KEY `idx_slowlog_000000_query_time` (`query_time`)," \
"KEY `idx_slowlog_000000_timestamp` (`timestamp`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8"
createTableSql = createTableSql.replace('000000',mysqlTableDate)
# Create slow log table if not exist
try:
cursor.execute("show tables like '%s'" % mysqlTable)
res = cursor.fetchone()
if not res:
cursor.execute(createTableSql)
mysql_pool.commit()
except Exception as e:
print(time_log() +'Error:', e)
mysql_pool.rollback()
mysql_pool.close()
slowLogInsertSql ="insert into %s" % mysqlTable + "(hostname," \
"ip," \
"port," \
"sys_time," \
"cli_user," \
"cli_ip," \
"`schema`," \
"query_time," \
"lock_time," \
"rows_sent," \
"bytes_sent," \
"`timestamp`," \
"command) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
if len(data) > 0:
for item in data:
row = (item['hostname'].encode('utf-8'),
item['ip'].encode('utf-8'),
item['port'],
item['sys_time'].encode('utf-8'),
item['cli_user'].encode('utf-8'),
item['cli_ip'].encode('utf-8'),
item['schema'].encode('utf-8'),
item['query_time'].encode('utf-8'),
item['lock_time'].encode('utf-8'),
item['rows_sent'].encode('utf-8'),
item['bytes_sent'].encode('utf-8'),
item['timestamp'].encode('utf-8'),
pymysql.escape_string(item['command']).encode('utf-8'))
data_list.append(row)
print(len(data_list))
# Insert slow log data
try:
cursor.executemany(slowLogInsertSql , data_list)
mysql_pool.commit()
mysql_pool.close()
except Exception as e:
print(time_log() +'Error:',e)
mysql_pool.rollback()
mysql_pool.close()
else:
print(time_log() + 'No data')
def main():
try:
redis_pool = redis.ConnectionPool(host=redisHost, port=redisPort, db=redisDB)
redisConn= redis.Redis(connection_pool=redis_pool)
except:
print(time_log() + 'Error! Can not connect to redis!')
try:
mysql_pool = pymysql.connect(host=mysqlHost, port=mysqlPort, user=mysqlUser, password=mysqlPasswd, db=mysqlDB)
except:
print(time_log() + 'Error! Can not connect to mysql!')
print(time_log())
data = gather_log(redisConn)
send_data(data,mysql_pool)
print(time_log())
# time scheduler
timeSchedule = collectStep
global timer
timer = threading.Timer(timeSchedule, main)
timer.start()
if __name__ == '__main__':
timer = threading.Timer(1, main)
timer.start()
前端使用django展示慢查詢數據,同時每周通過將響應的業(yè)務慢查詢數據發(fā)送給開發(fā)人員。
mysql錯誤日志也是同樣進行處理。