在 Spark RDD 和/或 Spark DataFrames 中重塑/透视数据
我有一些以下格式的数据(RDD 或 Spark DataFrame):
I have some data in the following format (either RDD or Spark DataFrame):
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
# convert to a Spark DataFrame
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
我想做的是重塑"数据,将国家(特别是美国、英国和加利福尼亚州)中的某些行转换为列:
What I would like to do is to 'reshape' the data, convert certain rows in Country(specifically US, UK and CA) into columns:
ID Age US UK CA
'X01' 41 3 1 2
'X02' 72 4 6 7
本质上,我需要一些类似于 Python 的 pivot
工作流程的东西:
Essentially, I need something along the lines of Python's pivot
workflow:
categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID',
columns = 'Country',
values = 'Score')
我的数据集相当大,所以我无法真正collect()
并将数据摄取到内存中以在 Python 本身中进行重塑.有没有办法在映射 RDD 或 Spark DataFrame 时将 Python 的 .pivot()
转换为可调用函数?任何帮助将不胜感激!
My dataset is rather large so I can't really collect()
and ingest the data into memory to do the reshaping in Python itself. Is there a way to convert Python's .pivot()
into an invokable function while mapping either an RDD or a Spark DataFrame? Any help would be appreciated!
从 Spark 1.6 开始,您可以使用 pivot
函数在 GroupedData
上并提供聚合表达式.
Since Spark 1.6 you can use pivot
function on GroupedData
and provide aggregate expression.
pivoted = (df
.groupBy("ID", "Age")
.pivot(
"Country",
['US', 'UK', 'CA']) # Optional list of levels
.sum("Score")) # alternatively you can use .agg(expr))
pivoted.show()
## +---+---+---+---+---+
## | ID|Age| US| UK| CA|
## +---+---+---+---+---+
## |X01| 41| 3| 1| 2|
## |X02| 72| 4| 6| 7|
## +---+---+---+---+---+
可以省略级别,但如果提供级别可以提高性能并用作内部过滤器.
Levels can be omitted but if provided can both boost performance and serve as an internal filter.
这种方法仍然相对较慢,但肯定胜过在 JVM 和 Python 之间手动传递数据.
This method is still relatively slow but certainly beats manual passing data manually between JVM and Python.