logstash grok 多项匹配

黎东
L、先森
2018-10-22 0 6723

业务场景:新版本日志需要添加字段,需要兼容新旧日志匹配

值得留意的是即使你的日志是能正常匹配的,Grok还是会按照顺序许匹配送进来的日志,当碰到第一个匹配成功的日志就break掉这个循环。这就要我们自己去判断一下,怎么放是最合适的了,不然的话会一个一个往下进行尝试,毕竟是多种不同的格式。
一种常用的优化方案是使用分层匹配来对这个Grok进行优化


第一种:

filter {
    grok {
	match => {
	    "message"=>[
		"%{DATA:hostname}\|%{DATA:tag}\|%{DATA:types}\|%{DATA:uid}\|%{GREEDYDATA:msg}",
		"%{DATA:hostname}\|%{DATA:tag}\|%{GREEDYDATA:msg}"]
	    }
    }

第二种:

filter {
    grok {
         match => [
            "message" , "%{DATA:hostname}\|%{DATA:tag}\|%{DATA:types}\|%{DATA:uid}\|%{GREEDYDATA:msg}",
            "message" , "%{DATA:hostname}\|%{DATA:tag}\|%{GREEDYDATA:msg}"
         ]
        remove_field => ['type','_id','input_type','tags','message','beat','offset']
    }

第三种:太多使用DATA和GREEDYDAYA会导致性能cpu负载严重。建议多使用正则匹配,或者ruby代码块

filter {
    ruby {
        code =>'
        arr = event["message"].split("|")
        if arr.length == 5
            event["hostname"] = arr[0]
            event["tag"] = arr[1]
            event["types"] = arr[2]
            event["uid"] = arr[3]
            event["msg"] = arr[4]
        elsif arr.length == 3
            event["hostname"] = arr[0]
            event["tag"] = arr[1]
            event["msg"] = arr[2]
        end'
       remove_field => ['type','_id','input_type','tags','message','beat','offset']
    }


大数据