Spark:将 RDD 元素拆分成块
我在 Scala 中编写了一个相对简单的 Spark 作业,它从 S3 读取一些数据,执行一些转换和聚合,最后将结果存储到存储库中.
I've written a relatively simple Spark job in Scala which reads some data from S3, performs some transformations and aggregations and finally stores the results into a repository.
在最后阶段,我有我的域模型的 RDD,我想将它们分组为元素块,以便我可以在我的存储库中进行一些批量插入.
At the final stage, I have an RDD of my domain model and I would like to group them into chunks of elements so that I can do some mass insertions in my repository.
我使用了 RDDFunctions.sliding
方法来实现这一点,它几乎可以正常工作.这是我的代码的简化版本:
I used the RDDFunctions.sliding
method to achieve that and it's working almost fine. Here is a simplified version of my code:
val processedElements: RDD[DomainModel] = _
RDDFunctions.fromRDD(processedElements)
.sliding(500, 500)
.foreach { elementsChunk =>
Await.ready(repository.bulkInsert(elementsChunk), 1.minute)
}
问题是,例如,如果我有 1020 个元素,那么我的存储库中最终只有 1000 个元素.如果窗口大小大于剩余元素的数量,则滑动会忽略任何其他元素.
The problem is that if for example I have 1020 elements, only 1000 elements end up in my repository. It looks like sliding ignores any additional elements if the window size is larger than the amount of remaining elements.
有什么办法可以解决这个问题吗?如果没有,是否有其他方法可以在不使用 RDDFunctions.sliding
的情况下实现相同的行为?
Is there any way to resolve this? If not, is there any other way to achieve the same behaviour without using RDDFunctions.sliding
?
难道你不能只使用 foreachPartition
和手动批量管理吗?
Couldn't you just use foreachPartition
and manual batch management?
fromRDD.foreachPartition(items: Iterator[DomainModel] => {
val batch = new ArrayBuffer[DomainModel](BATCH_SIZE)
while (items.hasNext) {
if (batch.size >= BATCH_SIZE) {
bulkInsert(batch)
batch.clear()
}
batch += items.next
}
if (!batch.isEmpty) {
bulkInsert(batch)
}
})