通过Logstash由SQLServer向Elasticsearch同步数据

延用上篇ELK所需环境,新增logstash配置文件

需要数据库链接驱动 Microsoft JDBC driver 6.2 for SQL Server

下载地址: https://www.microsoft.com/zh-CN/download/details.aspx?id=55539

在 logstash 的 bin 文件夹下 新增文件夹 jdbcconfig 以及如下文件

通过Logstash由SQLServer向Elasticsearch同步数据

新增 Logstash 配置文件 jdbc.conf

input {
    jdbc {
     jdbc_driver_library => "D:ELK_logslogstash-6.3.2injdbcconfigmssql-jdbc-6.2.2.jre8.jar"
            jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
            jdbc_connection_string => "jdbc:sqlserver://192.168.100.51:1433;DatabaseName=BTPreservation;"
            jdbc_user => "sa"
            jdbc_password => "Rl123456"
                     # schedule => 分 时 天 月 年  
                        # schedule => * 22  *  *  *     //will execute at 22:00 every day
            schedule => "* * * * *"
            jdbc_paging_enabled => true
            jdbc_page_size => 1000
            clean_run => false
            use_column_value => true
            #设置查询条件的字段
              tracking_column => FID
            record_last_run => true
            last_run_metadata_path => "D:ELK_logslogstash-6.3.2injdbcconfigFID.txt"
            #设置列名小写
              lowercase_column_names => false
            statement_filepath => "D:ELK_logslogstash-6.3.2injdbcconfigx_Loan_PreservationAdvanceList.sql"
            #索引的类型
              type => "advancelist"
    }
}

output {
    elasticsearch {
        hosts => ["192.168.100.50:9200"]
        index => "advancelist"
        document_id => "%{FID}"
    }
    stdout {
        #codec => json_lines
        #设置输出的格式
        codec => line {
            format => "FID: %{[FID]} FPersonName: %{[FPERSONNAME]} FAddTime: %{[FADDTIME]}"
        }
    }
}
  • 这里是将最后一次查询所得的最大ID 存储下来,每次执行 同步是 将此ID 设置为查询条件,获取增量数据,如果我们使用最后更新时间作为判断条件,则可以获取整张表最新数据

FID.txt 存储查询条件 ,配置好后自动生成

 

x_Loan_PreservationAdvanceList.sql 需要同步数据执行的Sql

SELECT * FROM dbo.x_Loan_PreservationAdvanceList WHERE FID > :sql_last_value

 

注意:启动时因为是同台机器运行多个logstash实例,所以需要指定不同的数据存储目录 path.Data

执行命令:

.logstash -f .jdbcconfigjdbc.conf --path.data=/jdbcconfig/

执行后成功后,在 kibana 创建 索引

通过Logstash由SQLServer向Elasticsearch同步数据

因为我们执行命令时,使用的时 cmd 窗口,当你向同步的数据库表中新增数据是,会实时在 cmd 窗口看到同步情况

后面我们也可以使用 NSSM 工具将其安装成服务运行

思考:如果我们要同步多张表该如何处理?,什么样的场景需要这样的同步。