在Spark 2.1.0中读取大文件时出现内存不足错误
我想使用spark来将大型(51 GB)XML文件(在外部HDD上)读入数据帧(使用
I want to use spark to read a large (51GB) XML file (on an external HDD) into a dataframe (using spark-xml plugin), do simple mapping / filtering, reordering it and then writing it back to disk, as a CSV file.
但是不管我如何调整,我总是得到java.lang.OutOfMemoryError: Java heap space
.
But I always get a java.lang.OutOfMemoryError: Java heap space
no matter how I tweak this.
我想了解为什么不增加分区数量会阻止OOM错误
不是应该将任务分成更多的部分,以使每个部分都较小并且不会引起内存问题吗?
Shouldn't it split the task into more parts so that each individual part is smaller and doesn't cause memory problems?
(Spark不可能试图将所有内容都塞进内存,如果不合适的话会崩溃,对吧?)
(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)
我尝试过的事情:
- 在读取和写入时将数据帧重新分区/合并为(5,000和10,000个分区)
- 使用较少数量的执行器(即使有 2 个执行器,也有6、4个执行器,我会看到OOM错误!)
- 减小拆分文件的大小(默认大小为33MB)
- 给吨RAM(我所有的)
- 将
spark.memory.fraction
增大到0.8(默认值为0.6) - 将
spark.memory.storageFraction
减小至0.2(默认值为0.5) - 将
spark.default.parallelism
设置为30和40(我默认为8) - 将
spark.files.maxPartitionBytes
设置为64M(默认为128M)
- repartitioning/coalescing to (5,000 and 10,000 partitions) the dataframe when reading and when writing (initial value is 1,604)
- using a smaller number of executors (6, 4, even with 2 executors I get OOM error!)
- decrease the size of split files (default looks like it's 33MB)
- give tons of RAM (all I have)
- increase
spark.memory.fraction
to 0.8 (default is 0.6) - decrease
spark.memory.storageFraction
to 0.2 (default is 0.5) - set
spark.default.parallelism
to a 30 and 40 (default is 8 for me) - set
spark.files.maxPartitionBytes
to 64M (default is 128M)
我所有的代码都在这里(注意,我没有缓存任何内容):
All my code is here (notice i'm not caching anything):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where( df2.col("_TypeId") === "1" )
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
注释
- 输入拆分非常小(仅33MB),所以为什么不能有8个线程分别处理一个拆分?真的不应该让我记忆犹新(我已经
更新我已经编写了代码的较短版本,它只读取文件,然后读取forEachPartition(println).
UPDATE I've written a shorter version of the code that just reads the file and then forEachPartition(println).
我收到相同的OOM错误:
I get the same OOM error:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
P.S .:我使用的是Spark v 2.1.0.我的机器有8个内核和16 GB内存.
P.S.: I'm using spark v 2.1.0. My machine has 8 cores and 16 GB ram.
在运行spark-shell时出现此错误,因此我将驱动程序内存增加了很多.然后,我可以加载XML.
I was getting this error when running spark-shell and hence I increased the driver memory to a high number. Then I was able to load the XML.
spark-shell --driver-memory 6G
来源: https://github.com/lintool/warcbase/issues/246#issuecomment-249272263