使用所需的键和值更新 Map 类型的 spark 数据框的列

问题描述:

我有一个以下 spark 数据框,其中所有列(主键列 emp_id 除外)都包含一个映射(带有可以具有空值的键from"和to").我想评估每列的from"和to"(emp_id除外),并向映射(名为change")添加一个新键,其值为a) 如果 'from' 值为 null 且 'to' 不为 null 则为 'insert'b) 如果 'to' 值为 null 且 'from' 不为 null,则为 'delete'b) 如果 'from' 和 'to' 不为 null & 则为 'update''from' 值不同于 'to' 值

I have a following spark dataframe where all the columns (except for primary key column emp_id) consist of a map (with keys 'from' and 'to' which can have null values). I want to evaluate 'from' and 'to' of each column(except emp_id) and add a new key to the map(named 'change') which has a value of a) 'insert' if 'from' value is null and 'to' is not null b) 'delete' if 'to' value is null and 'from' is not null b) 'update' if 'from' and 'to' are not null & 'from' value is different from 'to' value

注意:具有空值的列将保持不变.

Note: columns which have null value will remain untouched.

我们如何在 Scala 中实现这一点.

How can we achieve this in Scala.

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

预期:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |

实现这一点的一种方法是使用 UDF,这不是一个很好的解决方案,但我想不出其他解决方案.

One way to achieve this is by using UDF, which is not a great solution but I can't think of other solutions.

尽量不要使用UDF

val updateMap = udf((input: Map[String, String]) => {
  if (input == null || input.isEmpty)
    Map.empty[String, String]
  else if (input("from") == null && input("to") != null)
    input + ("change" -> "insert")
  else if (input("from") != null && input("to") == null)
    input + ("change" -> "delete")
  else if (!(input("from").equals(input("to"))))
    input + ("change" -> "update")
  else
    Map.empty[String, String]

})

val result = df.columns.tail.foldLeft(df) { (acc, name) =>
  acc.withColumn(name, updateMap(col(name)))
}

确保您的列是 Map[String, String]

希望这会有所帮助!