Spark Streaming xml文件
我需要处理流到S3文件夹中的xml文件.目前,我已将其实现如下.
I have a requirement to process xml files streamed into a S3 folder. Currently, I have implemented it as follows.
首先,使用Spark的fileStream读取文件
First, Read files using Spark's fileStream
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
对于每个RDD,请检查是否已读取任何文件
For each RDD, check if any file has been read
if (data.count() !=0)
将字符串写入新的HDFS目录
Write the string to a new HDFS directory
data.coalesce(1).saveAsTextFile(sdir);
从上面的HDFS目录创建一个读取的数据框
Create a Dataframe reading from the above HDFS directory
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
对Dataframe进行一些处理并另存为JSON
Do some processing on Dataframe and save as JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")
不知何故,我认为上述方法效率低下,坦率地说很像个男孩子. 有更好的解决方案吗?任何帮助将不胜感激.
Somehow, I feel that the above approach is very inefficient and frankly quite school boyish. Is there a better solution? Any help would be greatly appreciated.
一个后续问题: 如何操作数据框中的字段(不是列)? 我有一个vey复杂的嵌套xml,当我使用上述方法时,我将获得一个包含9列和50个奇数内部Struct数组的Dataframe.很好,除了需要修剪某些字段名称.有没有办法在不分解数据帧的情况下实现这一目标,因为我需要再次构造相同的结构?
A follow up question: How to manipulate fields (not Columns) in a dataframe? I have a vey complex nested xml and when I use the above described method, I am getting a Dataframe with 9 columns and 50 odd inner Struct arrays. That is fine except for the need to trim certain field names. Is there a way to achieve that without exploding the dataframe, as I need to construct the same structure again?
如果您使用Spark 2.0,则可以使其与结构化流一起使用:
If you use Spark 2.0 you may be able to make it work with structured streaming:
val inputDF = spark.readStream.format("com.databricks.spark.xml")
.option("rowTag", "Trans")
.load(path)