Flume+Kafka+Sparkstreaming日志分析
最近要做一个日志实时分析的应用,采用了flume+kafka+sparkstreaming框架,先搞了一个测试Demo,本文没有分析其架构原理。
简介:flume是一个分布式,高可靠,可用的海量日志聚合系统,kafka是一高吞吐量的分布式发布订阅系统,sparkstreaming是建立在spark上的实时计算框架,这这个Demo中,以上内容均为单机版伪分布,flume的source为exec,agent的名称为producer,sink为kafka。
运行所需要的环境直接到官网上下载即可:
我的环境是:flume1.6+kafka_2.10+spark1.2.0
flume的配置:
在conf下编辑配置文件roomy.conf如下:
#agent section producer.sources = s producer.channels = c producer.sinks = r #source section producer.sources.s.type = exec producer.sources.s.command = tail -F -n+1 /Users/roomy/Desktop/Coding/scala/real_time_project/debug.log#监听日志所在 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=192.168.1.102:9092#这里换成自己Kafka的地址 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8
在flume文件夹下运行
bin/flume-ng agent --conf conf --conf-file conf/roomy.conf --name producer -Dflume.root.logger=INFO,console
flume的部分完成。
在kafka目录下运行:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动zookeeper
运行:
bin/kafka-server-start.sh config/server.properties
启动kafka,这里无需做什么额外配置。
最后编写spark streaming测试Demo程序
直接新建SBT项目,build.sbt如下:
name := "sk" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" libraryDependencies += "log4j" % "log4j" % "1.2.17"
需要注意的是,由于GFW,下载慢的要死,接下来就是测试程序
import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * Created by roomy on 16/3/23. */ object KafkaStreaming { def main(agrs: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest") val ssc = new StreamingContext(sparkConf, Seconds(20)) val topic = "test" val topicSet = topic.split(" ").toSet //create direct kafka stream with brokers and topics val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicSet ) val lines = messages.map(_._2) lines.print() val words: DStream[String] = lines.flatMap(_.split(" ")) words.count().print() //启动 ssc.start() ssc.awaitTermination() } }
可以通过StreamContext的构造函数设置数据采集分析的间隔。
程序会监听/Users/roomy/Desktop/Coding/scala/real_time_project/debug.log中的变动,并以20秒一次的频率总计增加行数输出在控制台。
日志没有变动的时候如下:
运行测试程序产生日志:
import org.apache.log4j.Logger; /** * Created by roomy on 16/3/23. * to generate some log to test */ public class LogGenerator implements Runnable{ private static Logger logger = Logger.getLogger(LogGenerator.class); private int no; public LogGenerator(int no){ this.no=no; } public static void main(String [] agrs) throws InterruptedException { for(int i=0;i<5;i++){ new Thread(new LogGenerator(i)).start(); } } @Override public void run() { while (true){ logger.debug("this is a test information produced by roomy no:"+Thread.currentThread().getName()); try{ Thread.sleep((int)Math.random()*100); } catch (Exception e){ e.printStackTrace(); } } } }
控制台输出如下:
streaming的输出操作会把每个批次的前十个元素输出如下:
在这20秒内总共产生的日志行数为:
参考文档:
https://flume.apache.org/FlumeUserGuide.html
http://kafka.apache.org/documentation.html
Spark快速大数据分析