PySpark将数组内的结构字段转换为字符串
问题描述:
我有一个像这样架构的数据框:
I have a dataframe with schema like this:
|-- order: string (nullable = true)
|-- travel: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- place: struct (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- address: string (nullable = true)
| | | |-- latitude: double (nullable = true)
| | | |-- longitude: double (nullable = true)
| | |-- distance_in_kms: float (nullable = true)
| | |-- estimated_time: struct (nullable = true)
| | | |-- seconds: long (nullable = true)
| | | |-- nanos: integer (nullable = true)
我想获取 estimated_time
中的秒数并将其转换为字符串并与 s
连接,然后将 estimated_time
替换为新的字符串值.例如,{ "seconds": "988", "nanos": "102" }
将转换为 988s
,因此架构将更改为
I want to get the seconds in estimated_time
and convert it into a string and concatenate it with s
, and then replace estimated_time
with the new string value. For example, { "seconds": "988", "nanos": "102" }
will be converted to 988s
, so the schema will change to
|-- order: string (nullable = true)
|-- travel: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- place: struct (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- address: string (nullable = true)
| | | |-- latitude: double (nullable = true)
| | | |-- longitude: double (nullable = true)
| | |-- distance_in_kms: float (nullable = true)
| | |-- estimated_time: string (nullable = true)
如何在 PySpark 中执行此操作?
How can I do this in PySpark?
更具体的例子,我想转换这个DF(在JSON中可视化)
More concrete example, I want to transform this DF (visualized in JSON)
{
"order": "c-331",
"travel": [
{
"place": {
"name": "A place",
"address": "The address",
"latitude": 0.0,
"longitude": 0.0
},
"distance_in_kms": 1.0,
"estimated_time": {
"seconds": 988,
"nanos": 102
}
}
]
}
进入
{
"order": "c-331",
"travel": [
{
"place": {
"name": "A place",
"address": "The address",
"latitude": 0.0,
"longitude": 0.0
},
"distance_in_kms": 1.0,
"estimated_time": "988s"
}
]
}
答
您可以使用以下 pyspark 函数执行此操作:
You can do this with the following pyspark functions:
- withColumn 允许您创建一个新列.我们将使用它来提取estimated_time"
- concat 连接字符串列
- lit 创建一个给定字符串的列
- withColumn lets you create a new column. We will use this to extract "estimated_time"
- concat concatenates string columns
- lit creates a column of a given string
请看下面的例子:
from pyspark.sql import functions as F
j = '{"order":"c-331","travel":[{"place":{"name":"A place","address":"The address","latitude":0.0,"longitude":0.0},"distance_in_kms":1.0,"estimated_time":{"seconds":988,"nanos":102}}]}'
df = spark.read.json(sc.parallelize([j]))
#the following command creates a new column called estimated_time2 which contains the values of travel.estimated_time.seconds concatenated with a 's'
bla = df.withColumn('estimated_time2', F.concat(df.travel.estimated_time.seconds[0].cast("string"), F.lit("s")))
#unfortunately it is currently not possible to use withColumn to add a new member to a struct. Therefore the following command replaces 'travel.estimated_time' with the before created column estimated_time2
bla = bla.select("order"
, F.array(
F.struct(
bla.travel.distance_in_kms[0].alias("distance_in_kms")
,bla.travel.place[0].alias("place")
, bla.estimated_time2.alias('estimated_time')
)).alias("travel"))
bla.show(truncate=False)
bla.printSchema()
这就是输出:
+-----+------------------------------------------+
|order|travel |
+-----+------------------------------------------+
|c-331|[[1.0,[The address,0.0,0.0,A place],988s]]|
+-----+------------------------------------------+
root
|-- order: string (nullable = true)
|-- travel: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- distance_in_kms: double (nullable = true)
| | |-- place: struct (nullable = true)
| | | |-- address: string (nullable = true)
| | | |-- latitude: double (nullable = true)
| | | |-- longitude: double (nullable = true)
| | | |-- name: string (nullable = true)
| | |-- estimated_time: string (nullable = true)