使用所需的键和值更新Map类型的spark数据框的列

问题描述:

我有一个以下的spark数据帧,其中所有列(主键列emp_id除外)都由一个映射(键'from'和'to'可以具有空值)组成.我想评估每列的从"和到"(emp_id除外),然后向映射(名称为"change")添加一个新键,其值为a)如果'from'值为null且'to'不为null则'insert'b)如果至"值为空且来自"不为空,则删除"b)如果从"和到"不为空,则为更新".从"值不同于至"值

I have a following spark dataframe where all the columns (except for primary key column emp_id) consist of a map (with keys 'from' and 'to' which can have null values). I want to evaluate 'from' and 'to' of each column(except emp_id) and add a new key to the map(named 'change') which has a value of a) 'insert' if 'from' value is null and 'to' is not null b) 'delete' if 'to' value is null and 'from' is not null b) 'update' if 'from' and 'to' are not null & 'from' value is different from 'to' value

注意:具有空值的列将保持不变.

Note: columns which have null value will remain untouched.

我们如何在Scala中实现这一目标.

How can we achieve this in Scala.

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

预期:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |

实现此目标的一种方法是使用 UDF ,这不是一个很好的解决方案,但我想不出其他解决方案.

One way to achieve this is by using UDF, which is not a great solution but I can't think of other solutions.

尽量不要使用 UDF

val updateMap = udf((input: Map[String, String]) => {
  if (input == null || input.isEmpty)
    Map.empty[String, String]
  else if (input("from") == null && input("to") != null)
    input + ("change" -> "insert")
  else if (input("from") != null && input("to") == null)
    input + ("change" -> "delete")
  else if (!(input("from").equals(input("to"))))
    input + ("change" -> "update")
  else
    Map.empty[String, String]

})

val result = df.columns.tail.foldLeft(df) { (acc, name) =>
  acc.withColumn(name, updateMap(col(name)))
}

确保您的列为 Map [String,String]

希望这会有所帮助!