读取多个.gz文件并确定哪一行属于哪个文件
我正在读取多个.gz文件,以使用Google数据流进行处理.数据的最终目的地是BigQuery. BigQuery表在.gz文件中的csv文件中的每个列都有专用的列. BQ表中还有一列作为file_name,它提供了该记录所属的文件名.我正在使用TextIO.Read读取文件并对其进行ParDo转换.在DoFn中,有一种方法可以识别传入字符串所属的文件名.
I am reading multiple .gz file to process using google dataflow. Final destination of data is BigQuery. BigQuery table has dedicated columns for each columns in csv file within .gz file. There is one additional column in BQ table as file_name which gives the file name to which this record belongs to. I am reading files using TextIO.Read and doing ParDo transformation on it. Within DoFn is there a way to identify the file name to which the incoming string belongs to.
我的代码如下:
PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
.from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));
PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}
更新1:
我现在正在尝试如下操作:
I am now trying as below:
PCollection<String> fileNamesCollection // this is collection of file names
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
}
}));
但是,当我运行该程序时,得到的是不可以序列化的channelFactory,我那里有任何正在实现Serializable接口的通道工厂,可以在这里使用.
But when I run this program am getting that channelFactory is not serializable, i there any channel factory which is implementing Serializable interface and can be used here.
更新2:我终于可以执行程序并成功提交作业了.感谢jkff的帮助. 下面是我的最终代码,我将其粘贴在这里,以便对其他人也有帮助.
Update 2: I am finally able to execute program and successfully submit job. Thanks to jkff for assistance. Below is my final code, I am pasting it here so that it will helpful for others too.
ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
loggingOptions.setDefaultWorkerLogLevel(Level.WARN);
String jobName = "unique_job_name";
options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);
Pipeline pipeline = Pipeline.create(options);
List<String> filesToProcess = new ArrayList<String>();
for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
}
// at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
// I have to create _options here because Options, GcsIOChannelFactory are non serializable
ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
br.close();
gzip.close();
readChannel.close();
}
}));
// Performing reshuffling here as suggested
PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());
PCollection<TableRow> formattedResults = withFileName
.apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String,String> kv = c.element();
String logLine = kv.getValue();
String logFileName = kv.getKey();
// do further processing as you want here
}));
// Finally insert in BQ table the formattedResults
现在,答案是否定的.不幸的是,如果需要访问文件名,在这种情况下,最好的选择是自己实现文件模式扩展和文件解析(作为ParDo
).您需要牢记以下几点:
Right now, the answer is no. If you need access to filenames, unfortunately, your best bet in this case is to implement filepattern expansion and file parsing yourself (as a ParDo
). Here's a few things you'll need to keep in mind:
- Make sure to insert a redistribute right before the parsing
ParDo
, to prevent excessive fusion. - You can use
GcsIoChannelFactory
to expand the filepattern (see examples in this question) and to open aReadableByteChannel
. Use Channels.newInputStream to create anInputStream
, then wrap it into Java's standardGZipInputStream
and read it line-by-line - see this question for examples. Remember to close the streams.
或者,您可以考虑编写自己的基于文件的来源.但是,在这种情况下(.gz文件),我建议不要这样做,因为该API主要用于可以从任何偏移量进行随机访问的文件.
Alternatively, you may consider writing your own file-based source. However, in this particular case (.gz files) I would recommend against it, because that API is primarily intended for files that can be read with random access from any offset.