mysql logstash 配置

mysql logstash 配置

[elk@dr-mysql01 mysql]$ cat logstash_mysql.conf
input {
        file {
                type => "zj_mysql"
                path => ["/data01/applog_backup/zjzc_log/zj-mysql01-slowlog.*"]
                 codec => multiline {
      pattern => "^s+#s+User@Host:"
      negate => true
      what => "previous"
    }

        }
    
       file { 
                type => "wj_mysql" 
                path => ["/data01/applog_backup/winfae_log/wj-mysql01-slowlog.*"] 
                 codec => multiline {
      pattern => "^s+#s+User@Host:"
      negate => true
      what => "previous"
    }

        } 
        

    }

filter {
  # drop sleep events
  grok {
    match => { "message" => "SELECT SLEEP" }
    add_tag => [ "sleep_drop" ]
    tag_on_failure => [] # prevent default _grokparsefailure tag on real records
  }
  if "sleep_drop" in [tags] {
    drop {}
  }
  grok {
    match => [ "message","(?m)s*# User@Host:s+S+[%{USER:user}]s+@s+[%{IP:clientip}]s+(?<id>(S+s+)*S+)s*#s+Query_time:s+%{NUMBER:Query_time}s+Lock_time: %{NUMBER:lock_time}s+Rows_sent: %{NUMBER:rows_sent}s+Rows_examined: %{NUMBER:rows_examined}s*
s*SETs+timestamp=%{NUMBER:timestamp};s*(?<query>(s*S+s*).*)s*" 
]
  }
  date {
    match => [ "timestamp", "UNIX" ]
    remove_field => [ "timestamp" ]
  }
}
output {
     if [type] == "zj_mysql" { 
        redis {
                host => "192.168.32.67"
                data_type => "list"
                key => "zj_mysql:redis"
                port=>"6379"
                password => "1234567"
        }
}
      else if [type] == "wj_mysql"{
       redis { 
                host => "192.168.32.67" 
                data_type => "list" 
                key => "wj_mysql:redis" 
                port=>"6379" 
                password => "1234567" 
        } 
}
}