如何在 Spark Dataframe 中按组/分区重命名列?

问题描述:

我有一些传感器数据按通道名称存储在表中,而不是传感器名称(这是为了避免表非常宽,因为许多传感器仅在少数设备上使用 - 稀疏的工作列,我知道,但我只是数据的用户).像这样:

I have some sensor data that is stored in a table by channel name, rather that sensor name (this is to avoid having very wide tables owing to the fact that many sensors are only used on a few devices - the job for sparse columns, I know, but I am simply a user of the data). Something like this:

from functools import reduce

import numpy as np
import pandas as pd

np.random.seed(0)

data_df = pd.DataFrame({
    'id': ['a']*5 + ['b']*5 + ['c']*5,
    'chan1': range(15),
    'chan2': np.random.uniform(0, 10, size=15),
    'chan3': np.random.uniform(0, 100, size=15)
})

第二个表告诉我们如何根据设备的特定 ID 将通道名称映射到传感器名称:

There is a second table that tells us how to map channel names to sensor names according to the particular ID of the device:

sensor_channel_df = pd.DataFrame([
    {'id': 'a', 'channel': 'chan1', 'sensor': 'weight'},
    {'id': 'a', 'channel': 'chan2', 'sensor': 'torque'},
    {'id': 'a', 'channel': 'chan3', 'sensor': 'temp'},
    {'id': 'b', 'channel': 'chan1', 'sensor': 'weight'},
    {'id': 'b', 'channel': 'chan2', 'sensor': 'temp'},
    {'id': 'b', 'channel': 'chan3', 'sensor': 'speed'},
    {'id': 'c', 'channel': 'chan1', 'sensor': 'temp'},
    {'id': 'c', 'channel': 'chan2', 'sensor': 'weight'},
    {'id': 'c', 'channel': 'chan3', 'sensor': 'acceleration'},
])

我可以像这样创建一个重命名字典:

I can create a renaming dictionary like so:

channel_rename_dict = sensor_channel_df.groupby('id')\
                                       .apply(lambda grp: dict(zip(grp['channel'], grp['sensor'])))\
                                       .to_dict()

然后使用进一步的 groupby/apply 重命名所有列:

Then rename all the columns with a further groupby/apply:

data_df.groupby('id')\
       .apply(lambda group: group.rename(columns=channel_rename_dict[group.name]))\
       .reset_index(level=0, drop=True)

我们得到这样的结果:

    acceleration id      speed       temp    torque    weight
0            NaN  a        NaN   8.712930  5.488135  0.000000
1            NaN  a        NaN   2.021840  7.151894  1.000000
2            NaN  a        NaN  83.261985  6.027634  2.000000
3            NaN  a        NaN  77.815675  5.448832  3.000000
4            NaN  a        NaN  87.001215  4.236548  4.000000
5            NaN  b  97.861834   6.458941       NaN  5.000000
6            NaN  b  79.915856   4.375872       NaN  6.000000
7            NaN  b  46.147936   8.917730       NaN  7.000000
8            NaN  b  78.052918   9.636628       NaN  8.000000
9            NaN  b  11.827443   3.834415       NaN  9.000000
10     63.992102  c        NaN  10.000000       NaN  7.917250
11     14.335329  c        NaN  11.000000       NaN  5.288949
12     94.466892  c        NaN  12.000000       NaN  5.680446
13     52.184832  c        NaN  13.000000       NaN  9.255966
14     41.466194  c        NaN  14.000000       NaN  0.710361

这一切都很好(虽然我不会惊讶地发现在 Pandas 中有更好的方法),我用它向一些同事展示了这个过程的逻辑.

This is all fine (though I would be unsurprised to learn that there is better way of doing it in pandas), and I used it to demonstrate the logic of this process to some colleagues.

然而,对于项目架构,我们决定使用 spark.有没有办法在 Spark 数据帧中实现相同的行为?

However, for the project architecture, it was decided that we would be using spark. Is there a way I can achieve this same behavior in Spark dataframes?

我最初的想法是首先缓存完整的data_df,然后用filter分解id上的数据帧>.例如,假设 data_df 现在是一个 spark 数据帧:

My initial thought was to first cache the full data_df, then break up the dataframe on id with filter. E.g., assuming data_df is now a spark dataframe:

data_df.cache()
unique_ids = data_df.select('id').distinct().rdd.map(lambda row: row[0]).collect()
split_dfs = {id: data_df.filter(data_df['id'] == id) for id in unique_ids}

然后,如果我们像以前一样拥有列重命名字典,我们可以执行以下操作:

Then, if we have the column rename dictionary as before, we can perform something along the lines of:

dfs_paired_with_rename_tuple_lists = [
    (split_dfs[id], list(channel_rename_dict[id].items()))
    for id in unique_ids
]

new_dfs = [
    reduce(lambda df_i, rename_tuple: df_i.withColumnRenamed(*rename_tuple), rename_tuple_list, df)
    for df, rename_tuple_list in dfs_paired_with_rename_tuple_lists
]

然后,在确保它们具有公共列后,我可以在此 Spark Dataframes 列表上使用 Union() 执行 reduce.

I could then perform a reduce with a Union() on this list of spark Dataframes after ensuring they have common columns.

我的感觉是这会非常缓慢,而且可能有更好的方法来解决这个问题.

My feeling is that this would be extraordinarily slow, and that there is likely a much better way to go about this.

首先,让我们重新定义映射以按channel分组并返回MapType Column (toolz 很方便,但是可以用itertools代替.chain)*:

First, let's redefine mapping to group by channel and return MapType Column (toolz are convenient, but can be replaced with itertools.chain)*:

from toolz import concat, interleave
from pyspark.sql.functions import col, create_map, lit, struct

# Create literal column from id to sensor -> channel map
channel_map = create_map(*concat((lit(k), v) for k, v in sensor_channel_df
    .groupby("id")
    # Create map Column from literal label to channel
    .apply(lambda grp: create_map(*interleave([
        map(lit, grp["sensor"]),
        map(col, grp["channel"])])))
    .to_dict()
    .items()))

接下来,获取传感器列表:

Next, get a list of sensors:

sensors = sorted(sensor_channel_df["sensor"].unique().tolist())

并合并数据列:

df = spark.createDataFrame(data_df)
data_cols = struct(*[c for c in df.columns if c != "id"])

上面定义的组件可以组合:

Components defined above can be combined:

cols = [channel_map[col("id")][sensor].alias(sensor) for sensor in sensors]

df.select(["id"] + cols)

+---+------------------+------------------+------------------+------------------+------------------+
| id|      acceleration|             speed|              temp|            torque|            weight|
+---+------------------+------------------+------------------+------------------+------------------+
|  a|              null|              null| 8.712929970154072|5.4881350392732475|               0.0|
|  a|              null|              null| 2.021839744032572| 7.151893663724195|               1.0|
|  a|              null|              null|  83.2619845547938| 6.027633760716439|               2.0|
|  a|              null|              null| 77.81567509498505| 5.448831829968968|               3.0|
|  a|              null|              null| 87.00121482468191| 4.236547993389047|               4.0|
|  b|              null|  97.8618342232764| 6.458941130666561|              null|               5.0|
|  b|              null| 79.91585642167236| 4.375872112626925|              null|               6.0|
|  b|              null|46.147936225293186| 8.917730007820797|              null|               7.0|
|  b|              null| 78.05291762864555| 9.636627605010293|              null|               8.0|
|  b|              null|11.827442586893323|3.8344151882577773|              null|               9.0|
|  c| 63.99210213275238|              null|              10.0|              null| 7.917250380826646|
|  c| 14.33532874090464|              null|              11.0|              null| 5.288949197529044|
|  c| 94.46689170495839|              null|              12.0|              null| 5.680445610939323|
|  c|52.184832175007166|              null|              13.0|              null|  9.25596638292661|
|  c| 41.46619399905236|              null|              14.0|              null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+

也可以使用udf,虽然效率较低:

It is also possible, although less efficient, to use udf:

from toolz import unique
from pyspark.sql.types import *
from pyspark.sql.functions import udf

channel_dict = (sensor_channel_df
    .groupby("id")
    .apply(lambda grp: dict(zip(grp["sensor"], grp["channel"])))
    .to_dict())

def remap(d):
    fields = sorted(unique(concat(_.keys() for _ in d.values())))
    schema = StructType([StructField(f, DoubleType()) for f in fields])
    def _(row, id):
        return tuple(float(row[d[id].get(f)]) if d[id].get(f) is not None 
                     else None for f in fields)
    return udf(_, schema)

(df
    .withColumn("vals", remap(channel_dict)(data_cols, "id"))
    .select("id", "vals.*"))

+---+------------------+------------------+------------------+------------------+------------------+
| id|      acceleration|             speed|              temp|            torque|            weight|
+---+------------------+------------------+------------------+------------------+------------------+
|  a|              null|              null| 8.712929970154072|5.4881350392732475|               0.0|
|  a|              null|              null| 2.021839744032572| 7.151893663724195|               1.0|
|  a|              null|              null|  83.2619845547938| 6.027633760716439|               2.0|
|  a|              null|              null| 77.81567509498505| 5.448831829968968|               3.0|
|  a|              null|              null| 87.00121482468191| 4.236547993389047|               4.0|
|  b|              null|  97.8618342232764| 6.458941130666561|              null|               5.0|
|  b|              null| 79.91585642167236| 4.375872112626925|              null|               6.0|
|  b|              null|46.147936225293186| 8.917730007820797|              null|               7.0|
|  b|              null| 78.05291762864555| 9.636627605010293|              null|               8.0|
|  b|              null|11.827442586893323|3.8344151882577773|              null|               9.0|
|  c| 63.99210213275238|              null|              10.0|              null| 7.917250380826646|
|  c| 14.33532874090464|              null|              11.0|              null| 5.288949197529044|
|  c| 94.46689170495839|              null|              12.0|              null| 5.680445610939323|
|  c|52.184832175007166|              null|              13.0|              null|  9.25596638292661|
|  c| 41.46619399905236|              null|              14.0|              null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+

在 Spark 2.3 或更高版本中,您可以使用矢量化 UDF 应用当前代码.

In Spark 2.3 or later you can apply your current code with vectorized UDF.

* 为了理解这里发生了什么,让我们将单个组看成一个,由 apply 处理:

* To understand what is going on here let's take a look at a single group as one, processed by apply:

grp = sensor_channel_df.groupby("id").get_group("a")

首先我们将 sensor 传感器列转换为一系列 Spark 文字 Columns(考虑常量值):

First we convert sensor sensor column to a sequence of Spark literals Columns (think about constant value):

keys = list(map(lit, grp["sensor"]))
keys

Column<b'weight'>, Column<b'torque'>, Column<b'temp'>]

sensor 列到 Spark Columns 的序列(考虑指向数据的指针):

and sensor column to sequence of Spark Columns (think about pointer to the data):

values = list(map(col, grp["channel"]))
values

[Column<b'chan1'>, Column<b'chan2'>, Column<b'chan3'>]

在上下文中进行评估时,前一个将导致恒定输出:

When evaluated in a context the former one will result in constant output:

df_ = df.drop_duplicates(subset=["id"])

df_.select(keys).show()

+------+------+----+
|weight|torque|temp|
+------+------+----+
|weight|torque|temp|
|weight|torque|temp|
|weight|torque|temp|
+------+------+----+

而后者会重复数据:

df_.select(values).show(3)

+-----+------------------+-----------------+
|chan1|             chan2|            chan3|
+-----+------------------+-----------------+
|   10| 7.917250380826646|63.99210213275238|
|    5| 6.458941130666561| 97.8618342232764|
|    0|5.4881350392732475|8.712929970154072|
+-----+------------------+-----------------+

接下来我们将这两者交错并组合成一个MapType列:

Next we interleave these two and combine into a MapType column:

mapping = create_map(*interleave([keys, values]))
mapping 

Column<b'map(weight, chan1, torque, chan2, temp, chan3)'>

这为我们提供了从度量名称到数据列的映射(想想 Python dict),并在评估时:

This gives us mapping from a metric name to the data column (think about Python dict), and when evaluated:

df_.select(mapping).show(3, False)

+---------------------------------------------------------------------------+
|map(weight, chan1, torque, chan2, temp, chan3)                             |
+---------------------------------------------------------------------------+
|Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238)|
|Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764)  |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072)|
+---------------------------------------------------------------------------+

最终外部理解对所有组重复此操作,因此channel_map 是一个Column:

Finally external comprehension repeats this for all groups, so channel_map is a Column:

Column<b'map(a, map(weight, chan1, torque, chan2, temp, chan3), b, map(weight, chan1, temp, chan2, speed, chan3), c, map(temp, chan1, weight, chan2, acceleration, chan3))'>

评估给出以下结构:

df_.select(channel_map.alias("channel_map")).show(3, False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Map(a -> Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238), b -> Map(weight -> 10.0, temp -> 7.917250380826646, speed -> 63.99210213275238), c -> Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238))|
|Map(a -> Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764), b -> Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764), c -> Map(temp -> 5.0, weight -> 6.458941130666561, acceleration -> 97.8618342232764))      |
|Map(a -> Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072), b -> Map(weight -> 0.0, temp -> 5.4881350392732475, speed -> 8.712929970154072), c -> Map(temp -> 0.0, weight -> 5.4881350392732475, acceleration -> 8.712929970154072))|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

最后我们使用id列来选择感兴趣的map:

Finally we use id column to select map of interest:

df_.select(channel_map[col("id")].alias("data_mapping")).show(3, False)

+---------------------------------------------------------------------------------+
|data_mapping                                                                     |
+---------------------------------------------------------------------------------+
|Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238)|
|Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764)         |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072)      |
+---------------------------------------------------------------------------------+

和列名以从 map 中提取值:

and column names to extract values from the map:

df_.select(channel_map[col("id")]["weight"].alias("weight")).show(3, False)

+-----------------+
|weight           |
+-----------------+
|7.917250380826646|
|5.0              |
|0.0              |
+-----------------+

归根结底,这只是对包含符号表达式的数据结构进行的一系列简单转换.

At the end of the day this just a bunch of simple transformation on data structures containing symbolic expressions.