Logstash (1)
感觉它比flume要稍微强大一点. 最近研究了一下, 将一些小心得记录在这里.
话说最好的教程还是官网: https://www.elastic.co/guide/index.html
关于安装
logstash是用JRuby语言开发的,所以要安装JDK.然后解压安装包即可
logstash基本上由三部分组成,input、output以及用户需要才添加的filter,因此标准的配置文件格式如下:
input {...}
filter {...}
output {...}
input 类似于flume 的source, output 类似flume 的sink. filter 类似interceptor
在每个部分中,也可以指定多个访问方式,例如想要指定两个日志来源文件,则可以这样写:
input {
file { path =>"/var/log/messages" type =>"syslog"}
file { path =>"/var/log/apache/access.log" type =>"apache"}
}
类似的,如果在filter中添加了多种处理规则,则按照它的顺序一一处理,但是有一些插件并不是线程安全的。
比如在filter中指定了两个一样的的插件,这两个任务并不能保证准确的按顺序执行,因此官方也推荐避免在filter中重复使用插件。
下面是一个非常简单的从本地文件夹读入, 写入到kafka 中的例子:
logstash.conf
input { file { path => "/var/service_logs/*.log" discover_interval => 5 start_position => "beginning" } } output { kafka { topic_id => "servicelogs" codec => plain { format => "%{message}" } bootstrap_servers => "192.168.75.71:9092,192.168.75.72:9092,192.168.75.73:9092" } }
bootstrap_servers 给定kafka broker 的 地址. (注意不是zookeeper,而是直接给定broker 地址)
启动logstash 时, 只需要把写好的conf 文件作为参数给启动脚本即可. 如果不指定启动配置文件, 默认从标准输入作为input,标准输出作为output
bin/logstash -f logstash.conf
简单的测试
1. 作为准备, 启动Zookeeper和kafka集群 :
zookeeper/bin/zkServer.sh start
kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties
2. 创建topic: servicelogs
bin/kafka-topics.sh --create --zookeeper amie01:2181 --replication-factor 3 --partitions 3 --topic servicelogs
kafka-topics.sh --describe --zookeeper amie01:2181 --topic servicelogs查看一下topic 详细信息 :
Topic:servicelogs PartitionCount:3 ReplicationFactor:3 Configs:
Topic: servicelogs Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: servicelogs Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: servicelogs Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
3. 启动logstash :
bin/logstash -f logstash.conf
可以写个简单的脚本模拟数据: while true ; do echo `date` >> /var/service_logs/a.log ; sleep 0.05; done
然后为了查看kafka的servicelogs 是否已经被logstash output 了消息, 可以用kafka的console consume 测试一下.
测试结果是完成可以work的~~