星火流在S3目录

问题描述:

所以,我必须通过亚马逊的Kinesis被流数千个事件到SQS然后倒入一个S3目录。大约每隔10分钟,将创建一个新的文本文件,从室壁运动数据转储到S3。我想成立星火流,使其流被倾倒入S3新的文件。现在我有

So I have thousands of events being streamed through Amazon Kinesis into SQS then dumped into a S3 directory. About every 10 minutes, a new text file is created to dump the data from Kinesis into S3. I would like to set up Spark Streaming so that it streams the new files being dumped into S3. Right now I have

import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()

然而,星火流不拿起新文件被倾销到S3。我认为这是与文件写入要求:

However, Spark Streaming is not picking up the new files being dumped into S3. I think it has something to do with the file write requirements:

The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

为什么星火流不拾取新文件?是不是因为AWS是建立在目录中的文件,而不是移动它们?我怎样才能确保星火拿起文件被倾倒到S3?

Why is Spark streaming not picking up the new files? Is it because AWS is creating the files in the directory and not moving them? How can I make sure Spark picks up the files being dumped into S3?

为了来观看S3桶。您需要提供的路径,S3桶。它会在这个桶中的所有文件流的所有数据。然后,每当在这个桶中创建秋冬新品的文件,它会被送出。如果你将数据附加到现有文件这是之前阅读,这些新的更新,将无法读取。

In order to stream an S3 bucket. you need to provide the path to S3 bucket. And it will stream all data from all the files in this bucket. Then whenever w new file is created in this bucket, it will be streamed. If you are appending data to existing file which are read before, these new updates will not be read.

这里是小块code,工程

here is small piece of code that works

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)

val ssc = new org.apache.spark.streaming.StreamingContext(
  sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()

希望这会有所帮助。

hope it will help.