如何使用 groupBy 将行收集到地图中?

如何使用 groupBy 将行收集到地图中?

问题描述:

背景

sqlContext.sql(s"""
SELECT
school_name,
name,
age
FROM my_table
""")

询问

鉴于上表,我想按学校名称分组并将姓名、年龄收集到一个Map[String, Int]

Given the above table, I would like to group by school name and collect name, age into a Map[String, Int]

例如 - 伪代码

val df = sqlContext.sql(s"""
SELECT
school_name,
age
FROM my_table
GROUP BY school_name
""")


------------------------
school_name | name  | age
------------------------
school A | "michael"| 7 
school A | "emily"  | 5
school B | "cathy"  | 10
school B | "shaun"  | 5


df.groupBy("school_name").agg(make_map)

------------------------------------
school_name | map
------------------------------------
school A    | {"michael": 7, "emily": 5}
school B    | {"cathy": 10, "shaun": 5}

以下将适用于 Spark 2.0.您可以使用 map 函数自 2.0 版起可用,以获取作为 Map 的列.

Following will work with Spark 2.0. You can use map function available since 2.0 release to get columns as Map.

val df1 = df.groupBy(col("school_name")).agg(collect_list(map($"name",$"age")) as "map")
df1.show(false)

这将为您提供以下输出.

This will give you below output.

+-----------+------------------------------------+
|school_name|map                                 |
+-----------+------------------------------------+
|school B   |[Map(cathy -> 10), Map(shaun -> 5)] |
|school A   |[Map(michael -> 7), Map(emily -> 5)]|
+-----------+------------------------------------+

现在您可以使用 UDF 将单个 Map 合并为单个 Map,如下所示.

Now you can use UDF to join individual Maps into single Map like below.

import org.apache.spark.sql.functions.udf
val joinMap = udf { values: Seq[Map[String,Int]] => values.flatten.toMap }

val df2 = df1.withColumn("map", joinMap(col("map")))
df2.show(false)

这将使用 Map[String,Int] 提供所需的输出.

This will give required output with Map[String,Int].

+-----------+-----------------------------+
|school_name|map                          |
+-----------+-----------------------------+
|school B   |Map(cathy -> 10, shaun -> 5) |
|school A   |Map(michael -> 7, emily -> 5)|
+-----------+-----------------------------+

如果要将列值转换为 JSON 字符串,则 Spark 2.1.0 引入了 to_json 函数.

If you want to convert a column value into JSON String then Spark 2.1.0 has introduced to_json function.

val df3 = df2.withColumn("map",to_json(struct($"map")))
df3.show(false)

to_json 函数将返回以下输出.

The to_json function will return following output.

+-----------+-------------------------------+
|school_name|map                            |
+-----------+-------------------------------+
|school B   |{"map":{"cathy":10,"shaun":5}} |
|school A   |{"map":{"michael":7,"emily":5}}|
+-----------+-------------------------------+