Spark Scala:从另一个数据帧更新数据帧列的值

问题描述:

a =

+------------+------------+------+
|        Name| Nationality|Salary|
+------------+------------+------+
|    A. Abbas|        Iraq|   €2K|
| A. Abdallah|      France|   €1K|
|A. Abdennour|     Tunisia|  €31K|

b =

+------------+------------+
|        Name|Salary      |
+------------+------------+
|    A. Abbas|€4K         |
| A. Abdallah|€1K         |
|A. Abdennour|€33K        |

预期的 updatedDF 应如下所示:

the expected updatedDF should look like below:

+------------+------------+------+
|        Name| Nationality|Salary|
+------------+------------+------+
|    A. Abbas|        Iraq|   €4K|
| A. Abdallah|      France|   €1K|
|A. Abdennour|     Tunisia|  €33K|

我在 spark scala 代码中尝试过:

I tried in spark scala code like :

updatedDF = a.join(b, Seq("Name"), "inner")
updatedDF.show()

但是在加入后我的输出中有重复.如何在不重复的情况下合并两个数据帧?

But I have duplication in my output after doing join. how I can merge between tow data frames with out duplication ?

如果您有重复,那意味着名称列不是唯一的.我建议尝试追加要在连接中使用的索引列,然后将其删除:

If you have duplication, that means name column is not unique. I suggest to try append index column to be used in join, then drop it:

    // Add index now...
    a = addColumnIndex(a).withColumn("index", monotonically_increasing_id)
    println("1- a count: " + a.count())

    // Add index now...
    b = addColumnIndex(b).withColumn("index", monotonically_increasing_id)
    println("b count: " + b.count())

    def addColumnIndex(df: DataFrame) = {
        spark.sqlContext.createDataFrame(
            df.rdd.zipWithIndex.map {
                case (row, index) => Row.fromSeq(row.toSeq :+ index)
            },
            StructType(df.schema.fields :+ StructField("index", LongType, false)))
    }

    ab = a.join(b, Seq("index", "Name"), "inner").drop(a.col("Salary")).drop(a.col("index"))

    println("3- ab count: " + ab.count())