Flume+Kafka+Sparkstreaming日志分析

  最近要做一个日志实时分析的应用,采用了flume+kafka+sparkstreaming框架,先搞了一个测试Demo,本文没有分析其架构原理。

  简介:flume是一个分布式,高可靠,可用的海量日志聚合系统,kafka是一高吞吐量的分布式发布订阅系统,sparkstreaming是建立在spark上的实时计算框架,这这个Demo中,以上内容均为单机版伪分布,flume的source为exec,agent的名称为producer,sink为kafka。

  运行所需要的环境直接到官网上下载即可:

  我的环境是:flume1.6+kafka_2.10+spark1.2.0

  flume的配置:

  在conf下编辑配置文件roomy.conf如下:

#agent section
producer.sources = s
producer.channels = c
producer.sinks = r

#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -F -n+1 /Users/roomy/Desktop/Coding/scala/real_time_project/debug.log#监听日志所在
producer.sources.s.channels = c

# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=192.168.1.102:9092#这里换成自己Kafka的地址
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8

  在flume文件夹下运行

bin/flume-ng agent --conf conf --conf-file conf/roomy.conf --name producer -Dflume.root.logger=INFO,console

   flume的部分完成。

  在kafka目录下运行:

bin/zookeeper-server-start.sh config/zookeeper.properties

  启动zookeeper

  运行:

bin/kafka-server-start.sh config/server.properties

   启动kafka,这里无需做什么额外配置。

  最后编写spark streaming测试Demo程序

  直接新建SBT项目,build.sbt如下:

name := "sk"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

   需要注意的是,由于GFW,下载慢的要死,接下来就是测试程序

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Created by roomy on 16/3/23.
  */
object KafkaStreaming {

  def main(agrs: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val topic = "test"
    val topicSet = topic.split(" ").toSet

    //create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicSet
    )
    val lines = messages.map(_._2)
    lines.print()
    val words: DStream[String] = lines.flatMap(_.split("
"))
    words.count().print()

    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}

   可以通过StreamContext的构造函数设置数据采集分析的间隔。

   程序会监听/Users/roomy/Desktop/Coding/scala/real_time_project/debug.log中的变动,并以20秒一次的频率总计增加行数输出在控制台。

   日志没有变动的时候如下:

Flume+Kafka+Sparkstreaming日志分析

   运行测试程序产生日志:

import org.apache.log4j.Logger;

/**
 * Created by roomy on 16/3/23.
 * to generate some log to test
 */
public class LogGenerator implements Runnable{

    private static Logger logger = Logger.getLogger(LogGenerator.class);
    private int no;

    public LogGenerator(int no){
        this.no=no;
    }

    public static void main(String [] agrs) throws InterruptedException {
        for(int i=0;i<5;i++){
            new Thread(new LogGenerator(i)).start();
        }
    }

    @Override
    public void run() {
        while (true){
            logger.debug("this is a test information produced by roomy no:"+Thread.currentThread().getName());
            try{
                Thread.sleep((int)Math.random()*100);
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

   控制台输出如下:

  streaming的输出操作会把每个批次的前十个元素输出如下:

Flume+Kafka+Sparkstreaming日志分析

  在这20秒内总共产生的日志行数为:

Flume+Kafka+Sparkstreaming日志分析 

  参考文档:

  https://flume.apache.org/FlumeUserGuide.html

  http://kafka.apache.org/documentation.html 

  Spark快速大数据分析