logstash将采用kafka的数据到elasticSearch配置
logstash将采取kafka的数据到elasticSearch配置
# 如下是logstash 将 kafka的数据到elasticSearch 的主要配置。
# 其中 consumer_threads 的配置跟kafka集群的分片和logstash的集群有很大关系 。
# kafka的每个分片只能给一个comsumer消费,consumer_threads如果大于kafka分片,将导致consumer_threads有些线程空闲;consumer_threads如果小于kafka的分配,
将导致logstash 将要消费多个kafka的分片数据。
input {
kafka {
zk_connect => "A:2181,B:2181,C:2181"
group_id => "b2c.k2e.consumer"
topic_id => "clusterTest"
reset_beginning => false
consumer_threads => 4
decorate_events => false
queue_size => 1000
codec => multiline {
pattern => "^{\"date\""
negate => "true"
what => "previous"
}
}
}
filter {
mutate{
gsub => [
"message" , "\t" , "",
"message" , "\f" , ""
]
}
# timestamp 和 @timestamp 不是一样的变量
ruby {
code => "event['timestamp'] = LogStash::Timestamp.new(event['@timestamp']+ 8*60*60)"
}
ruby {
add_tag => [ "@arriveTime "]
code => "event['@arriveTime']= event['timestamp']"
}
mutate {
remove_field => ["timestamp"]
rename => { "message" => "temp" }
}
json {
source => "temp"
remove_field => ["temp"]
}
}
output {
elasticsearch {
hosts => ["D:9200","E:9201","F:9202","G:9206","H:9203","I:9204","J:9205"]
index => "clusterindex-%{+YYYY-MM-dd}"
flush_size => 2000
workers => 10
}
}
# 如下是logstash 将 kafka的数据到elasticSearch 的主要配置。
# 其中 consumer_threads 的配置跟kafka集群的分片和logstash的集群有很大关系 。
# kafka的每个分片只能给一个comsumer消费,consumer_threads如果大于kafka分片,将导致consumer_threads有些线程空闲;consumer_threads如果小于kafka的分配,
将导致logstash 将要消费多个kafka的分片数据。
input {
kafka {
zk_connect => "A:2181,B:2181,C:2181"
group_id => "b2c.k2e.consumer"
topic_id => "clusterTest"
reset_beginning => false
consumer_threads => 4
decorate_events => false
queue_size => 1000
codec => multiline {
pattern => "^{\"date\""
negate => "true"
what => "previous"
}
}
}
filter {
mutate{
gsub => [
"message" , "\t" , "",
"message" , "\f" , ""
]
}
# timestamp 和 @timestamp 不是一样的变量
ruby {
code => "event['timestamp'] = LogStash::Timestamp.new(event['@timestamp']+ 8*60*60)"
}
ruby {
add_tag => [ "@arriveTime "]
code => "event['@arriveTime']= event['timestamp']"
}
mutate {
remove_field => ["timestamp"]
rename => { "message" => "temp" }
}
json {
source => "temp"
remove_field => ["temp"]
}
}
output {
elasticsearch {
hosts => ["D:9200","E:9201","F:9202","G:9206","H:9203","I:9204","J:9205"]
index => "clusterindex-%{+YYYY-MM-dd}"
flush_size => 2000
workers => 10
}
}