日志系统 Paths that should be crawled and fetched. Glob based paths. - /var/log/*.log Paths that should be crawled and fetched. Glob based paths. - /var/log/*.log ----------------------------- Logstash output -------------------------------- The Logstash hosts The filter part of this file is commented out to indicate that it is optional.

ELK+FileBeat+Log4Net搭建日志系统

来源:https://www.zybuluo.com/muyanfeixiang/note/608470

标签(空格分隔): ELK Log4Net

项目中之前都是采用数据库来记录日志,虽然记录还算挺方便,但是每次都要到数据库来查询,如果日志在单独的数据库还好,只是有点麻烦。如果记录的日志数据库和生产正式库在一起,不仅会影响生产库的正常使用,也会带来安全隐患。
项目早期没有统一规划,也是时间仓促,没有做好日志的规划,所有日志都记录到数据库中。的确也遇到了性能问题,因此在了解ELK的基础上,使用其作为日志采集、处理和检索的几本框架。

大体框架
日志数据流如下,应用将日志落地在本地文件,部署在每台服务器上的FileBeat负责收集日志,然后将日志发送给LogStash;LogStash将日志进行处理之后,比如parse等;然后将处理后的Json对象传递给ElasticSearch,进行落地并进行索引处理;最后通过Kibana来提供web界面,来查看日志等。因为ES是基于Lucene的,所以Kibana支持Lucene查询语法。

image_1b4g890586hv1ag91ks3abv1i62m.png-24.8kB

对于日志数据流特别大的情况,LogStash会造成拥堵,这个时候可以使用消息队列来进行缓冲。同时,日志一旦进过LogStash之后,会不方面一些流处理程序来读取。这个时候使用kafka就比较好了,因为kafka是将消息持久化在本地,流处理应用可以从消息的offset初始的地方来读取。加入kafka的后的流程如下:
image_1b4g8qjmjegcnf614pqe61kbf13.png-33.1kB

配置过程
Log4Net配置
首先,最基本的引用Log4Net程序集,补多少。
其次,要在项目的AssemblyInfo.cs添加如下代码,这样配置才能给启作用。
[assembly: log4net.Config.XmlConfigurator(Watch = true)]
最后,也是最重要的就是在web.config(或app.config)中配置了。
在configSections中添加如下代码


添加log4net节点

针对两种不同类型的应用日志,分别使用两种pattern,也分别记录到Debug.lg和Error.log文件中。

日志类型为Debug,Info,Warn的日志,使用[%date] [%thread] %-5level Log4NetTest %logger %method [%message%exception]%n模式,分别记录下时间,线程,日志等级,应用名称,日志记录类属性,日志记录方法,日志信息和异常
日志类型为Error和Fatal的日志,使用[%date] [%thread] %-5level Log4NetTest %l [%message%n%exception]%n,分别是时间,线程,日志等级,应用名称,出错位置(包含具体文件,以及所在行,需要PDB文件才能到行),日志信息和异常
分两类的主要原因就在于是否记录了出错位置,这个是比较耗性能的操作,所以对于常规的Debug,Info,Warn可以不记录位置。
除此之外,没有记录host,因为FileBeat在采集日志时候会自动记录hostname。这样log4net的基本配置就完成了。

FileBeat配置
只需简单的配置就即可使用,当然也可以配置的很复杂。配置文件filebeat.yml
一个input_type代表一个输入源,可选值只有log和stdin。
paths是日志采集路径,可以使用通配符。
document_type是可以用来表示日志类型。输入到logstash中对应[type],可以据此使用不同grok语法来parse,这里分为errorLog和debugLog。

multiline.pattern: '^['
multiline.negate: true
multiline.match: after
上面这三个使用了将换行的日志或异常信息聚合为一个事件,因为默认FileBeat是按行来读取日志,然后传输给LogStash,如果没这个设置,就会造成日志分割为多个事件。
output.logstash就是输出的目标了。
然后直接运行filebeat.exe即可启动。当然也可以以服务方式启动。

filebeat.prospectors:

  • input_type: log

    paths:

    - /var/log/*.log

    • D:WebLog**.Error.log
      document_type: errorLog
      multiline.pattern: '^['
      multiline.negate: true
      multiline.match: after
  • input_type: log

    Paths that should be crawled and fetched. Glob based paths.

    paths:

    - /var/log/*.log

    • D:WebLog**.Debug.log
      document_type: debugLog

----------------------------- Logstash output --------------------------------

output.logstash:

The Logstash hosts

hosts: ["localhost:5044"]
LogStash配置
在config文件夹添加配置文件first-pipeline.conf。

input: 指定输入来源
filter:是指定如何对日志进行处理。这里[type]就是来自filebeat中document_type。然后就是grok语法了。
overwrite:是将原有message覆盖掉。如果将原有message完全match出来的话,是可以这样做的,可以节省空间。
output:就是输出地址了。
运行
bin/logstash -f first-pipeline.conf --config.test_and_exit 测试配置文件
bin/logstash -f first-pipeline.conf --config.reload.automatic 自动加载配置文件的修改
input {
beats {
port => "5044"
}
}

The filter part of this file is commented out to indicate that it is

optional.

filter {
if [type] == "debugLog" {
grok {
match => {
"message" => "[(?d{4}-d{2}-d{2}sd{2}:d{2}:d{2},d{3})]s+[(?.)]s+(?w)s+(?S)s+(?S)s+(?S)s+[(?.)]s"
}
overwrite => ["message"]
}
}
if [type] == "errorLog" {
grok {
match => {
"message" => "[(?d{4}-d{2}-d{2}sd{2}:d{2}:d{2},d{3})]s+[(?.
)]s+(?w)s+(?S)s+(?S)s+[(?.)]s*"
}
overwrite => ["message"]
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
ElasticSearch配置
默认不需要配置,监听9200端口。直接运行即可

Kibana配置
elasticsearch.url: "http://localhost:9200"
默认连接es地址,如果本机测试无需修改。正式环境中连接到对应服务器就好。
server.port: 5601
监听端口5601,可修改到合适的端口。

然后直接运行就可启动。
初次进入要指定index pattern。一般默认使用如下配置即可。
image_1b4gde7o71j909sp4hda1rs2p1g.png-84.2kB