如何将输出作为固定宽度的文件从Spark写入HDFS?

问题描述:

我必须从hdfs中读取一个csv文件,然后我需要应用将每一列都填充到固定宽度的逻辑,然后只需要将其存储为hdfs作为固定宽度文件.没有任何其他形式的示例csv或parquet.

I have to read a csv file from hdfs, then I need to apply the logic that every column is padded to a fixed width then I need store back to hdfs as fixed width file only. Not in any other form example csv or parquet.

如果我将hdfs的输入读取为csv,如下例所示:

If I read a input from hdfs as csv that look like below example:

Name, age, phonenumber
A, 25,9900999999
B, 26,7654890234
C, 27,5643217897

然后我需要在固定宽度的每一列上应用逻辑,例如第一列宽度应设置为15,第二列3,第三列应为10

Then I need apply logic on each column with fixed width like first column width should set as 15, 2nd column 3, 3rd as 10

在hdfs中输出应如下所示.

Output should look like this in hdfs.

Name      age   phonenumber           
A         25    9900999999
B         26    7654890234
C         27    5643217897

然后,我需要将固定宽度的数据作为固定宽度文件格式写入到hdfs中.

Then that fixed width data I need to write it to hdfs as fixed width file format.

如果已经使用了inferSchema,则需要将所有列都强制转换为字符串.将长度映射到df.columns,以便可以动态处理.检查一下:

You need to cast all columns as string, if inferSchema is already used. Map the length to the df.columns, so that you can handle this dynamically. Check this out:

scala> val df = Seq(("A", 25,9900999999L),("B", 26,7654890234L),("C", 27,5643217897L)).toDF("Name","age","phonenumber")
df: org.apache.spark.sql.DataFrame = [Name: string, age: int ... 1 more field]

scala> df.show(false)
+----+---+-----------+
|Name|age|phonenumber|
+----+---+-----------+
|A   |25 |9900999999 |
|B   |26 |7654890234 |
|C   |27 |5643217897 |
+----+---+-----------+


scala> val widths = Array(5,3,10)
widths: Array[Int] = Array(5, 3, 10)

scala> df.columns.zip(widths)
res235: Array[(String, Int)] = Array((Name,5), (age,3), (phonenumber,10))

scala> df.columns.zip(widths).foldLeft(df){ (acc,x) => acc.withColumn(x._1,rpad( trim(col(x._1).cast("string")),x._2," ")) }.show(false)
+-----+---+-----------+
|Name |age|phonenumber|
+-----+---+-----------+
|A    |25 |9900999999 |
|B    |26 |7654890234 |
|C    |27 |5643217897 |
+-----+---+-----------+

要验证填充..

scala> df.columns.zip(widths).foldLeft(df){ (acc,x) => acc.withColumn(x._1,rpad( trim(col(x._1).cast("string")),x._2,"-")) }.show(false)
+-----+---+-----------+
|Name |age|phonenumber|
+-----+---+-----------+
|A----|25-|9900999999 |
|B----|26-|7654890234 |
|C----|27-|5643217897 |
+-----+---+-----------+


scala>