根据spark数据帧scala中的列值过滤行
我有一个数据框(火花):
I have a dataframe(spark):
id value
3 0
3 1
3 0
4 1
4 0
4 0
我想创建一个新的数据框:
I want to create a new dataframe:
3 0
3 1
4 1
需要为每个 id 删除 1(value) 之后的所有行.我尝试在 spark dateframe(Scala) 中使用窗口函数.但是找不到解决办法,看来是我走错方向了.
Need to remove all the rows after 1(value) for each id.I tried with window functions in spark dateframe(Scala). But couldn't able to find a solution.Seems to be I am going in a wrong direction.
我正在 Scala 中寻找解决方案.谢谢
I am looking for a solution in Scala.Thanks
使用 monotonically_increasing_id 输出
Output using monotonically_increasing_id
scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.apache.spark.sql.DataFrame = [id: int, value: int]
scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]
scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 4| 1|
+---+-----+
如果我们在原始数据框中进行排序转换,该解决方案将不起作用.那个时候 monotonically_increasing_id() 是基于原始 DF 而不是排序的 DF 生成的.我之前错过了这个要求.
The solution wont work if we did a sorted transformation in the original dataframe. That time the monotonically_increasing_id() is generated based on original DF rather that sorted DF.I have missed that requirement before.
欢迎所有建议.
一种方法是使用 monotonically_increasing_id()
和一个自连接:
One way is to use monotonically_increasing_id()
and a self-join:
val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data.show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 3| 0|
| 4| 1|
| 4| 0|
| 4| 0|
+---+-----+
现在我们生成一个名为 idx
的列,其中 Long
增加:
Now we generate a column named idx
with an increasing Long
:
val dataWithIndex = data.withColumn("idx", monotonically_increasing_id())
// dataWithIndex.cache()
现在我们得到每个id
的min(idx)
,其中value = 1
:
Now we get the min(idx)
for each id
where value = 1
:
val minIdx = dataWithIndex
.filter($"value" === 1)
.groupBy($"id")
.agg(min($"idx"))
.toDF("r_id", "min_idx")
现在我们将min(idx)
加入到原来的DataFrame
中:
Now we join the min(idx)
back to the original DataFrame
:
dataWithIndex.join(
minIdx,
($"r_id" === $"id") && ($"idx" <= $"min_idx")
).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 4| 1|
+---+-----+
注意: monotonically_increasing_id()
根据行的分区生成其值.每次重新评估 dataWithIndex
时,此值可能会更改.在我上面的代码中,由于延迟评估,只有当我调用最终的 monotonically_increasing_id()
.
Note: monotonically_increasing_id()
generates its value based on the partition of the row. This value may change each time dataWithIndex
is re-evaluated. In my code above, because of lazy evaluation, it's only when I call the final show
that monotonically_increasing_id()
is evaluated.
如果您想强制该值保持不变,例如,您可以使用 show
逐步评估上述内容,请取消注释上面的这一行:
If you want to force the value to stay the same, for example so you can use show
to evaluate the above step-by-step, uncomment this line above:
// dataWithIndex.cache()