如何调整 spark 执行程序数量、内核和执行程序内存?

如何调整 spark 执行程序数量、内核和执行程序内存?

问题描述:

您从哪里开始调整上述参数.我们是从执行器内存开始获取执行器数量,还是从内核开始获取执行器数量.我按照 链接.然而有了一个高层次的想法,但仍然不确定如何或从哪里开始并得出最终结论.

Where do you start to tune the above mentioned params. Do we start with executor memory and get number of executors, or we start with cores and get the executor number. I followed the link. However got a high level idea, but still not sure how or where to start and arrive to a final conclusion.

以下答案涵盖了标题中提到的 3 个主要方面 - 执行程序数量、执行程序内存和内核数量.可能还有其他参数,如驱动程序内存和其他参数,我在本答案中没有解决,但希望在不久的将来添加.

The following answer covers the 3 main aspects mentioned in title - number of executors, executor memory and number of cores. There may be other parameters like driver memory and others which I did not address as of this answer, but would like to add in near future.

案例 1 硬件 - 6 个节点,每个节点 16 个内核,64 GB RAM

每个执行器都是一个 JVM 实例.所以我们可以在一个 Node 中有多个 executor

Each executor is a JVM instance. So we can have multiple executors in a single Node

操作系统和 Hadoop 守护进程需要第一个 1 个内核和 1 GB,因此每个节点可用 15 个内核和 63 GB RAM

First 1 core and 1 GB is needed for OS and Hadoop Daemons, so available are 15 cores, 63 GB RAM for each node

从如何选择核心数开始:

Number of cores = Concurrent tasks as executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

执行者数量:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

每个执行者的内存:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. 

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB

最终数字 - Executors - 17,核心 5,Executor 内存 - 19 GB

Final numbers - Executors - 17, Cores 5, Executor Memory - 19 GB

案例 2 硬件:相同的 6 节点、32 核、64 GB

5 是为了良好的并发性

5 is same for good concurrency

每个节点的执行者数量 = 32/5 ~ 6

Number of executors for each node = 32/5 ~ 6

所以总 executors = 6 * 6 Nodes = 36.那么最终的数量是 36 - 1,AM = 35

So total executors = 6 * 6 Nodes = 36. Then final number is 36 - 1 for AM = 35

Executor 内存为:每个节点 6 个 executor.63/6 ~ 10 .开销是 0.07 * 10 = 700 MB.所以四舍五入到 1GB 作为开销,我们得到 10-1 = 9 GB

Executor memory is : 6 executors for each node. 63/6 ~ 10 . Over head is .07 * 10 = 700 MB. So rounding to 1GB as over head, we get 10-1 = 9 GB

最终数字 - Executors - 35,核心 5,Executor 内存 - 9 GB

Final numbers - Executors - 35, Cores 5, Executor Memory - 9 GB

案例 3

上述场景从接受固定的内核数量开始,然后转移到执行程序和内存的数量.

The above scenarios start with accepting number of cores as fixed and moving to # of executors and memory.

现在对于第一种情况,如果我们认为不需要 19 GB,只需 10 GB 就足够了,那么以下是数字:

Now for first case, if we think we dont need 19 GB, and just 10 GB is sufficient, then following are the numbers:

核心 5每个节点的执行器数量 = 3

cores 5 # of executors for each node = 3

在这个阶段,根据我们的第一次计算,这将导致 21,然后是 19.但是因为我们认为 10 是可以的(假设开销很小),所以我们不能切换执行者的数量每个节点增加到 6 个(如 63/10).由于每个节点有 6 个执行器和 5 个内核,因此每个节点可以减少到 30 个内核,而我们只有 16 个内核.所以我们还需要改变数量每个执行器的核心.

At this stage, this would lead to 21, and then 19 as per our first calculation. But since we thought 10 is ok (assume little overhead), then we cant switch # of executors per node to 6 (like 63/10). Becase with 6 executors per node and 5 cores it comes down to 30 cores per node, when we only have 16 cores. So we also need to change number of cores for each executor.

再次计算,

幻数 5 变为 3(任何小于或等于 5 的数字).因此,使用 3 个内核和 15 个可用内核 - 我们每个节点有 5 个执行程序.所以 (5*6 -1) = 29 个执行者

The magic number 5 comes to 3 (any number less than or equal to 5). So with 3 cores, and 15 available cores - we get 5 executors per node. So (5*6 -1) = 29 executors

所以内存是 63/5 ~ 12.开销是 12*.07=.84所以执行器内存是 12 - 1 GB = 11 GB

So memory is 63/5 ~ 12. Over head is 12*.07=.84 So executor memory is 12 - 1 GB = 11 GB

最终数字为 29 个执行器,3 个内核,执行器内存为 11 GB

Final Numbers are 29 executors, 3 cores, executor memory is 11 GB

动态分配:

注意:如果启用动态分配,则执行器数量的上限.所以这意味着如果需要,spark 应用程序可以消耗所有资源.所以在运行其他应用程序并且它们也需要内核来运行任务的集群,请确保在集群级别执行此操作.我的意思是你可以分配基于用户访问权限的 YARN 的特定内核数.因此,您可以创建 spark_user 可能,然后为该用户提供核心(最小/最大).这些限制用于在 Spark 和其他在 YARN 上运行的应用程序之间共享.

spark.dynamicAllocation.enabled - 当它设置为 true 时 - 我们不需要提及执行程序.原因如下:

spark.dynamicAllocation.enabled - When this is set to true - We need not mention executors. The reason is below:

我们在 spark-submit 中给出的静态参数数适用于整个工作持续时间.然而,如果动态分配进入画面,将会有不同的阶段,如

The static params number we give at spark-submit is for the entire job duration. However if dynamic allocation comes into picture, there would be different stages like

从什么开始:

开始的执行器的初始数量(spark.dynamicAllocation.initialExecutors)

Initial number of executors (spark.dynamicAllocation.initialExecutors) to start with

多少:

然后根据负载(待处理的任务)请求多少.这最终将是我们在 spark-submit 以静态方式给出的数字.因此,一旦设置了初始执行程序编号,我们将转到最小 (spark.dynamicAllocation.minExecutors) 和最大 (spark.dynamicAllocation.maxExecutors) 编号.

Then based on load (tasks pending) how many to request. This would eventually be the numbers what we give at spark-submit in static way. So once the initial executor numbers are set, we go to min (spark.dynamicAllocation.minExecutors) and max (spark.dynamicAllocation.maxExecutors) numbers.

何时要求或给予:

我们什么时候请求新的 executor (spark.dynamicAllocation.schedulerBacklogTimeout) - 有这么长时间的待处理任务.所以要求.每轮请求的执行者数量比前一轮呈指数增长.例如,一个应用程序会在第一轮添加 1 个执行器,然后在后续轮中添加 2、4、8 等执行器.在特定点,上述最大值出现

When do we request new executors (spark.dynamicAllocation.schedulerBacklogTimeout) - There have been pending tasks for this much duration. so request. number of executors requested in each round increases exponentially from the previous round. For instance, an application will add 1 executor in the first round, and then 2, 4, 8 and so on executors in the subsequent rounds. At a specific point, the above max comes into picture

我们什么时候放弃执行器 (spark.dynamicAllocation.executorIdleTimeout) -

when do we give away an executor (spark.dynamicAllocation.executorIdleTimeout) -

如果我错过了什么,请纠正我.以上是我根据我所分享的博客和一些在线资源的理解.谢谢.

Please correct me if I missed anything. The above is my understanding based on the blog i shared in question and some online resources. Thank you.

参考: