33、shuffle性能优化

一、shuffle性能优化

1、没有开启consolidation机制的性能低下的原理剖析

33、shuffle性能优化


2、开启consolidation机制之后对磁盘io性能的提升的原理

33、shuffle性能优化


spark.shuffle.consolidateFiles:是否开启shuffle block file的合并,默认为false;

总结,开启了consolidation机制之后,shuffle map端,写磁盘的数量,大大减少;

比如节点100个shuffle map task ,10个cpu core,总共1000个result task,那么每个节点的磁盘文件总数,是10 * 1000 = 1万个;

此外,result task拉取的时候,磁盘io也变少了,每个result task,只要从每个节点上,拉取cpu core数量的磁盘文件即可;

比如,每个节点上,有100个shuffle map task,那么就要从100个文件中fetch,拉取,现在只需要从10个文件中fetch,拉取;

map端的bucket缓存,也可以适当提高大小,这样,溢出到磁盘的次数就变少了;

spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k;

每次只能拉取指定缓存大小的数据量,拉取完聚合处理,然后再次拉取,这个缓存是每个reduce task都有自己的,如果内存够大的话,那么可以适当加大,
那么拉取的次数就变少了,spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m;

执行reduce task的executor中,有一部分内存用来汇聚各个reduce task 拉取的数据,放入map,进行聚合,spark.shuffle.memoryFraction:用于reduce端聚合的内存比例,
默认0.2,超过比例就会溢出到磁盘上;

reduce task 拉取数据的时候,可能会遇到map task哪里的executor的jvm正在full gc,此时就会出现正常工作线程停止,那么可能等待一段时间后,full gc还没完成,
就导致文件没有拉取到,spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次;

很有可能,gc没有调优好,导致每次gc都1分钟,那么拉取的最大时间,默认是3 * 5 = 15s,就会导致频繁的很多文件拉取失败,就会给你报shuffle output file lost,
然后,DAGScheduler会重试task和stage,最后甚至可能导致Application挂掉,spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s;


3、调优参数总结

new SparkConf().set("spark.shuffle.consolidateFiles", "true")

spark.shuffle.consolidateFiles:是否开启shuffle block file的合并,默认为false
spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m
spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k
spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次
spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s
spark.shuffle.memoryFraction:用于reduce端聚合的内存比例,默认0.2,超过比例就会溢出到磁盘上