Logstash+ElasticSearch处理mysql慢查询日志
遇到一个需求, 需要查询某些业务的慢查询日志. 结果DBA平台那边提供的慢查询日志不能解决实际的业务场景(上报的字段补全), 无奈, 自己挽起袖子上
参考了 这篇文章, 不过自己根据需求做了较多的变更
开始吧
1. 找到日志的位置
先确认是否开启了, 然后找到日志文件的位置
> show variables like '%slow%';
+---------------------+-------------------------------------+
| Variable_name | Value |
+---------------------+-------------------------------------+
| log_slow_queries | ON |
| slow_launch_time | 2 |
| slow_query_log | ON |
| slow_query_log_file | /data/mysqllog/20000/slow-query.log |
+---------------------+-------------------------------------+
2. 慢查询日志
格式基本是如下, 当然, 格式如果有差异, 需要根据具体格式进行小的修改
# Time: 160524 5:12:29
# User@Host: user_a[xxxx] @ [10.166.140.109]
# Query_time: 1.711086 Lock_time: 0.000040 Rows_sent: 385489 Rows_examined: 385489
use dbname;
SET timestamp=1464037949;
SELECT 1 from dbname;
3. 使用 logstash 采集
采集, 无非是用multiline
进行多行解析
但是, 需要处理的几个问题
第一个是, 去除掉没用的信息
第二个, 慢查询sql, 是会反复出现的, 所以, 执行次数成了一个很重要的指标. 我们要做的, 就是降噪
(将参数去掉, 涉及带引号的内容+数字), 将参数类信息过滤掉, 留下核心的sql, 然后计算出一个hash, 这样就可以在查询, 根据这个字段进行聚合. 这里用到了 mutate 以及 checksum
# calculate unique hash
mutate {
add_field => {"sql_for_hash" => "%{sql}"}
}
mutate {
gsub => [
"sql_for_hash", "'.+?'", "",
"sql_for_hash", "-?\d*\.{0,1}\d+", ""
]
}
checksum {
algorithm => "md5"
keys => ["sql_for_hash"]
}
最后算出来的md5, 放入了logstash_checksum
第三个, 某些sql会非常大, 例如某些不规范的sql可能到几百M或是上G….会直接导致采集进程OOM, 所以, 处理时, 设定超过100k丢弃掉
第四个, 默认多行处理, 一条sql可能停留在采集端没有上报, 需要等到下一条sql进来, 这样是有问题的, 如果一直没有后续, 最后一条将不会进入引擎. 所以, 在配置中设定了超过5s自动上报
完整的logstash配置文件(具体使用可能需要根据自身日志格式做些小调整)
注意, 里面的pattern ALLWORD [\s\S]*
input {
file {
path => ["/data/mysqllog/20000/slow-query.log"]
sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb"
type => "mysql-slow-log"
add_field => ["env", "PRODUCT"]
codec => multiline {
pattern => "^# User@Host:"
negate => true
what => previous
max_bytes => "100kib"
auto_flush_interval => 5
}
}
}
filter {
if ("multiline_codec_max_bytes_reached" in [tags]) {
drop {}
}
grok {
# User@Host: logstash[logstash] @ localhost [127.0.0.1]
# User@Host: logstash[logstash] @ [127.0.0.1]
match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ]
}
grok {
# Query_time: 102.413328 Lock_time: 0.000167 Rows_sent: 0 Rows_examined: 1970
match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"]
}
# Capture the time the query happened
grok {
match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]
}
# if codec multiline parse failure
if ("_grokparsefailure" in [tags]) {
drop {}
}
date {
match => [ "timestamp", "UNIX" ]
}
mutate {
gsub => [
"sql", "\nSET timestamp=\d+?;\n", "",
"sql", "\nuse [a-zA-Z0-9\-\_]+?;", "",
"sql", "\n# Time: \d+\s+\d+:\d+:\d+", "",
"sql", "\n/usr/local/mysql/bin/mysqld.+$", "",
"sql", "\nTcp port:.+$", "",
"sql", "\nTime .+$", ""
]
}
# calculate unique hash
mutate {
add_field => {"sql_for_hash" => "%{sql}"}
}
mutate {
gsub => [
"sql_for_hash", "'.+?'", "",
"sql_for_hash", "-?\d*\.{0,1}\d+", ""
]
}
checksum {
algorithm => "md5"
keys => ["sql_for_hash"]
}
# Drop the captured timestamp field since it has been moved to the time of the event
mutate {
# TODO: remove the message field
remove_field => ["timestamp", "message", "sql_for_hash"]
}
}
output {
#stdout{
# codec => rubydebug
#}
#if ("_grokparsefailure" not in [tags]) {
# stdout{
# codec => rubydebug
# }
#}
if ("_grokparsefailure" not in [tags]) {
elasticsearch {
hosts => ["192.168.1.1:9200"]
index => "logstash-slowlog"
}
}
}
采集进去的内容
{
"@timestamp" => "2016-05-23T21:12:59.000Z",
"@version" => "1",
"tags" => [
[0] "multiline"
],
"path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log",
"host" => "Luna-mac-2.local",
"type" => "mysql-slow",
"env" => "PRODUCT",
"user" => "dba_bak_all_sel",
"ip" => "10.166.140.109",
"duration" => 28.812601,
"lock_wait" => 0.000132,
"results" => 749414,
"scanned" => 749414,
"sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;",
"logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371"
}
4. 写查询
查询, 我们需要按logstash_checksum
进行聚合, 然后按照次数由多到少降序展示, 同时, 每个logstash_checksum
需要有一条具体的sql进行展示
通过 es 的 Top hits Aggregation 可以完美地解决这个查询需求
查询的query
body = {
"from": 0,
"size": 0,
"query": {
"filtered": {
"query": {
"match": {
"user": "test"
}
},
"filter": {
"range": {
"@timestamp": {
"gte": "now-1d",
"lte": "now"
}
}
}
}
},
"aggs": {
"top_errors": {
"terms": {
"field": "logstash_checksum",
"size": 20
},
"aggs": {
"top_error_hits": {
"top_hits": {
"sort": [
{
"@timestamp":{
"order": "desc"
}
}
],
"_source": {
"include": [
"user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned"
]
},
"size" : 1
}
}
}
}
}
}
跟这个写法相关的几个参考链接: Terms Aggregation / Elasticsearch filter document group by field
5. 渲染页面
python的后台, 使用sqlparse
包, 将sql进行格式化(换行/缩进/大小写), 再往前端传. sqlparse
>>> sql = 'select * from foo where id in (select id from bar);'
>>> print sqlparse.format(sql, reindent=True, keyword_case='upper')
SELECT *
FROM foo
WHERE id IN
(SELECT id
FROM bar);
然后在页面上, 使用js进行语法高亮 code-prettify