检查是否已RDD元素是在另一个使用地图功能
我是新来的火花,并想知道关于闭包。结果
我有两个RDDS,一个包含ID的列表和一个值,而另一种含有所选ID的列表。结果
使用地图,我想增加元素的值,如果对方RDD包含其ID,像这样。
I'm new to Spark and was wondering about closures.
I have two RDDs, one containing a list of IDs and a values, and the other containing a list of selected IDs.
Using a map, I want to increase the value of the element, if the other RDD contains its ID, like so.
val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
vals.map( v => {
if(ids.collect().contains(v._1)){
(v._1, 1)
}
})
然而,作业挂起并无法完成。
什么是做正确的方法,
感谢您的帮助!
However the job hangs and never completes. What is the proper way to do this, Thanks for your help!
您实现尝试使用一个RDD( IDS
)用于映射另一个闭包内 - 这ISN 'T允许星火应用:在封闭中使用任何必须的序列化的(和preferably小),因为它会被序列化并发送到每个工人。
Your implementation tries to use one RDD (ids
) inside a closure used to map another - this isn't allowed in Spark applications: anything to be used in a closure must be serializable (and preferably small), since it will be serialized and sent to each worker.
在 leftOuterJoin
之间的这些RDDS应该得到你想要的东西:
a leftOuterJoin
between these RDDs should get you what you want:
val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
val result = vals
.leftOuterJoin(ids.keyBy(i => i))
.mapValues({
case (v, Some(matchingId)) => v + 1 // increase value if match found
case (v, None) => v // leave value as-is otherwise
})
的 leftOuterJoin
需要两个键 - 值RDDS,因此我们人为地提取IDS RDD使用从的关键身份的功能。然后我们地图的值的每个结果
(ID:智力,(价值:智力,matchingId:选项[INT]))
创成要么v或v + 1。
The leftOuterJoin
expects two key-value RDDs, hence we artificially extract a key from the ids
RDD using the identity function. Then we map the values of each resulting (id: Int, (value: Int, matchingId: Option[Int]))
record into either v or v+1.
通常情况下,你应该始终瞄准像使用行动收集尽量减少使用时的Spark,因为这样的行动将数据从分布式集群返回到您的驱动器应用
。
Generally, you should always aim to minimize the use of actions like collect
when using Spark, as such actions move data back from the distributed cluster into your driver application.