doParallel(package)foreach在R中的大迭代中不起作用

问题描述:

我正在运行以下代码(摘自 doParallel的PC(OS Linux)上的小插图),分别具有4个和8个物理和逻辑内核.

I'm running the following code (extracted from doParallel's Vignettes) on a PC (OS Linux) with 4 and 8 physical and logical cores, respectively.

iter=1e+6或更少的代码运行代码,一切都很好,并且从CPU使用情况中我可以看出,所有内核都用于此计算.但是,随着迭代次数的增加(例如iter=4e+6),似乎并行计算在这种情况下不起作用.当我还监视CPU使用率时,计算只涉及一个内核(100%使用率).

Running the code with iter=1e+6 or less, every thing is fine and I can see from CPU usage that all cores are employed for this computation. However, with larger number of iterations (e.g. iter=4e+6), it seems parallel computing does not work in which case. When I also monitor the CPU usage, just one core is involved in computations (100% usage).

示例1

require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]

您是否知道可能是什么原因?记忆可能是原因吗?

Do you have any idea what could be the reason? Could memory be the cause?

我四处搜寻,发现与我的问题有关,但关键是我没有遇到任何错误,并且OP似乎已经通过在foreach循环内提供必要的软件包提出了解决方案.但可以看到,在我的循环中没有使用任何包.

I googled around and I found THIS relevant to my question but the point is that I'm not given any kind of error and the OP seemingly has came up with a solution by providing necessary packages inside foreach loop. But no package is used inside my loop, as can be seen.

UPDATE1

我的问题仍然没有解决.根据我的实验,我不认为记忆可能是原因.在运行以下简单并行(在所有8个逻辑内核上)迭代的系统上,我有8GB的内存:

My problem still is not solved. As per my experiments, I don't think that memory could be the reason. I have 8GB of memory on the system on which I run the following simple parallel (over all 8 logical cores) iteration:

Example2

require("doParallel")
require("foreach")

registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        i
    }
})[3]

我没有运行此代码的问题,但是当我监视CPU使用率时,只有一个内核(8个内核中的一个)是100%.

I do not have problem with running of this code but when I monitor the CPU usage, just one core (out of 8) is 100%.

UPDATE2

对于 Example2 ,@ SteveWeston(感谢您指出这一点)表示(在评论中):您的更新中的示例正在执行微小的任务.只有主用户才能进行任何实际工作.要做的工作,包括发送任务和处理结果.这与原始示例的问题根本不同,原始示例确实在较少的迭代次数上使用了多个内核."

As for Example2, @SteveWeston (thanks for pointing this out) stated that (in comments) : "The example in your update is suffering from having tiny tasks. Only the master has any real work to do, which consists of sending tasks and processing results. That's fundamentally different than the problem with the original example which did use multiple cores on a smaller number of iterations."

但是, Example1 仍未解决.当我运行它并使用htop监视过程时,将发生以下更详细的情况:

However, Example1 still remains unsolved. When I run it and I monitor the processes with htop, here is what happens in more detail:

将所有8个创建的进程命名为p1p8. p1的状态(htop中的列SR中)是R,表示它正在运行并且保持不变.但是,对于直到p8p2,几分钟后,状态更改为D(即不间断睡眠),并且在几分钟后,状态再次更改为Z(即已终止但未被其父级收割) .你知道为什么会这样吗?

Let's name all 8 created processes p1 through p8. The status (column S in htop) for p1 is R meaning that it's running and remains unchanged. However, for p2 up to p8, after some minutes, the status changes to D (i.e. uninterruptible sleep) and, after some minutes, again changes to Z (i.e. terminated but not reaped by its parent). Do you have any idea why this happens?

我认为您的内存不足.这是该示例的修改版本,当您执行许多任务时,它应该会更好地工作.它使用doSNOW而不是doParallel,因为doSNOW允许您在工作人员返回结果时使用Combine函数处理结果.本示例将这些结果写入文件以使用更少的内存,但是最后使用".final"函数将结果读回内存,但是如果没有足够的内存,则可以跳过. >

I think you're running low on memory. Here's a modified version of that example that should work better when you have many tasks. It uses doSNOW rather than doParallel because doSNOW allows you to process the results with the combine function as they're returned by the workers. This example writes those results to a file in order to use less memory, however it reads the results back into memory at the end using a ".final" function, but you could skip that if you don't have enough memory.

library(doSNOW)
library(tcltk)
nw <- 4  # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)

x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000  # may require tuning for your machine
maxcomb <- nw + 1  # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)

comb <- function(fobj, ...) {
  for(r in list(...))
    writeBin(r, fobj)
  fobj
}

final <- function(fobj) {
  close(fobj)
  t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}

mkprogress <- function(total) {
  pb <- tkProgressBar(max=total,
                      label=sprintf('total tasks: %d', total))
  function(n, tag) {
    setTkProgressBar(pb, n,
      label=sprintf('last completed task: %d of %d', tag, total))
  }
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')

r <-
  foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
          .maxcombine=maxcomb, .init=resultFile, .final=final,
          .inorder=FALSE, .options.snow=opts) %dopar% {
    do.call('c', lapply(seq_len(n), function(i) {
      ind <- sample(100, 100, replace=TRUE)
      result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
      coefficients(result1)
    }))
  }

我包含了一个进度条,因为此示例需要几个小时才能执行.

I included a progress bar since this example takes several hours to execute.

请注意,此示例还使用了iterators包中的idiv函数来增加每个任务的工作量.该技术称为 chunking ,通常可以提高并行性能.但是,使用idiv会使任务索引混乱,因为变量i现在是每个任务的索引,而不是全局索引.对于全局索引,您可以编写一个包装idiv:

Note that this example also uses the idiv function from the iterators package to increase the amount of work in each of the tasks. This technique is called chunking, and often improves the parallel performance. However, using idiv messes up the task indices, since the variable i is now a per-task index rather than a global index. For a global index, you can write a custom iterator that wraps idiv:

idivix <- function(n, chunkSize) {
  i <- 1
  it <- idiv(n, chunkSize=chunkSize)
  nextEl <- function() {
    m <- nextElem(it)  # may throw 'StopIterator'
    value <- list(i=i, m=m)
    i <<- i + m
    value
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数.这是一个使用此自定义迭代器的简单foreach循环:

The values emitted by this iterator are lists, each containing a starting index and a count. Here's a simple foreach loop that uses this custom iterator:

r <- 
  foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
    do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
      i
    }))
  }

当然,如果任务的计算量足够大,则可能不需要分块,可以像原始示例中那样使用简单的foreach循环.

Of course, if the tasks are compute intensive enough, you may not need chunking and can use a simple foreach loop as in the original example.