带有迭代器的多处理池

问题描述:

我想在迭代器中使用多处理池,以便在线程中执行一个将迭代器分成N个元素的函数,直到迭代器完成为止.

I would like to use multiprocessing pool with an iterator in order to execute a function in a thread splitting the iterator in N elements until the iterator is finish.

import arcpy
from multiprocessing import Pool

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        #i_cursor is an iterator
        for row in rows:
            i_cursor.insertRow(row)

input_rows = []
count = 0
pool = Pool(4)
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
    #s_cursor is an iterator
    for row in s_cursor:
        if (count < 100):
            input_rows.append(row)
            count += 1
        else:
            #send 100 rows to the insert function in a new thread
            pool.apply_async(insert, input_rows)
            #reset count and input_rows
            count = 1
            input_rows = [row]


pool.join()
pool.close()

我的问题是,此脚本是正确的方法吗?有更好的方法吗?

My question, is this script the right way to do it? Is there a better way?

该脚本可能出了点问题,因为我在pool.join()

Probably something is wrong with that script, because I got the following AssertionError at the pool.join()

Traceback (most recent call last):
  File "G:\Maxime\truncate_append_pool.py", line 50, in <module>
    pool.join()
  File "C:\App\Python27\ArcGIS10.3\lib\multiprocessing\pool.py", line 460, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError

如果我不得不猜测代码的主要问题,那是因为将input_rows传递给了流程函数insert()- multiprocessing.Pool.apply_async() 的工作是将参数传递给它,因此您的insert()函数实际上是检索100参数,而不是检索包含100元素列表的一个参数.在您的过程功能甚至没有机会启动之前,这会导致立即出现错误.如果将呼叫更改为pool.apply_async(insert, [input_rows]),则它可能会开始工作...您也将违反迭代器的目的,您可能会将整个输入迭代器转换为列表,并将100的切片馈给

If I have to guess what's primarily wrong with your code, I'd say it's in passing your input_rows to your process function insert() - the way multiprocessing.Pool.apply_async() works is to unpack the arguments passed to it, so your insert() function actually retreives 100 arguments instead of one argument with a list of 100 elements. This causes an immediate error before your process function even gets the chance to start. If you change your call to pool.apply_async(insert, [input_rows]) it might start working... You also would be defeating the purpose of iterators and you just might convert your whole input iterator into a list and feed slices of 100 to multiprocessing.Pool.map() and be done with it.

但是您问是否有一种更好"的方式来做到这一点.尽管更好"是一个相对类别,但在理想情况下, multiprocessing.Pool 随附了一个方便的 imap() (和 imap_unordered() )方法用于消耗可迭代变量,并以惰性方式将其散布到选定的池中(因此在处理之前不会在整个迭代器上运行),因此,您需要构建的只是迭代器切片并将其传递给它进行处理,即:

But you asked if there is a 'better' way to do it. While 'better' is a relative category, in an ideal world, multiprocessing.Pool comes with a handy imap() (and imap_unordered()) method intended to consume iterables and spread them over the selected pool in a lazy fashion (so no running over the whole iterator before processing), so all you need to build are your iterator slices and pass it to it for processing, i.e.:

import arcpy
import itertools
import multiprocessing

# a utility function to get us a slice of an iterator, as an iterator
# when working with iterators maximum lazyness is preferred 
def iterator_slice(iterator, length):
    iterator = iter(iterator)
    while True:
        res = tuple(itertools.islice(iterator, length))
        if not res:
            break
        yield res

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        for row in rows:
            i_cursor.insertRow(row)

if __name__ == "__main__":  # guard for multi-platform use
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
            pass  # do whatever you want with your result (return from your process function)
        pool.close()  # all done, close cleanly

(顺便说一句,您的代码不会为所有s_cursor尺寸(不是100的倍数)的尺寸提供最后一个切片)

(btw. your code wouldn't give you the last slice for all s_cursor sizes that are not multiples of 100)

但是...如果它确实如广告所示那样工作,那就太好了.尽管多年来已经修复了很多问题,但在生成自己的迭代器时,imap_unordered()仍会为您的迭代器提供一个大样本(远大于实际池进程的数量),因此如果这是您必须自己躺下并弄脏自己的问题,而且您处在正确的轨道上-apply_async()是您要控制如何喂食池的方法,只需要确保您正确喂食您的游泳池:

But... it would be wonderful if it actually worked as advertised. While a lot of it has been fixed over the years, imap_unordered() will still take a large sample of your iterator (far larger than the actual pool processes' number) when producing its own iterator, so if that's a concern you'll have to get down and dirty yourself, and you're on the right track - apply_async() is the way to go when you want to control how to feed your pool, you just need to make sure you feed your pool properly:

if __name__ == "__main__":
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        cursor_iterator = iterator_slice(s_cursor, 100)  # slicer from above, for convinience
        queue = []  # a queue for our current worker async results, a deque would be faster
        while cursor_iterator or queue:  # while we have anything to do...
            try:
                # add our next slice to the pool:
                queue.append(pool.apply_async(insert, [next(cursor_iterator)])) 
            except (StopIteration, TypeError):  # no more data, clear out the slice iterator
                cursor_iterator = None
            # wait for a free worker or until all remaining finish
            while queue and (len(queue) >= pool._processes or not cursor_iterator):
                process = queue.pop(0)  # grab a process response from the top
                process.wait(0.1)  # let it breathe a little, 100ms should be enough
                if not process.ready():  # a sub-process has not finished execution
                    queue.append(process)  # add it back to the queue
                else:
                    # you can use process.get() to get the result if needed
                    pass
        pool.close()

现在,仅当需要下一个100个结果时(无论您的insert()处理函数是否干净退出),才会调用s_cursor迭代器.

And now your s_cursor iterator will be called only when the next 100 results are needed (when your insert() process function exits cleanly or not).

更新-如果需要捕获的结果,则先前发布的代码最后在关闭队列中存在错误,此代码应能很好地完成工作.我们可以使用一些模拟功能轻松地对其进行测试:

UPDATE - The previously posted code had a bug in it on closing queues in the end if a captured result is desired, this one should do the job nicely. We can easily test it with some mock functions:

import random
import time

# just an example generator to prove lazy access by printing when it generates
def get_counter(limit=100):
    for i in range(limit):
        if not i % 3:  # print every third generation to reduce verbosity
            print("Generated: {}".format(i))
        yield i

# our process function, just prints what's passed to it and waits for 1-6 seconds
def test_process(values):
    time_to_wait = 1 + random.random() * 5
    print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
    time.sleep(time_to_wait)
    print("Processed: {}".format(values))

现在我们可以像这样缠绕它们:

Now we can intertwine them like:

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=2)  # lets use just 2 workers
    count = get_counter(30)  # get our counter iterator set to iterate from 0-29
    count_iterator = iterator_slice(count, 7)  # we'll process them in chunks of 7
    queue = []  # a queue for our current worker async results, a deque would be faster
    while count_iterator or queue:
        try:
            # add our next slice to the pool:
            queue.append(pool.apply_async(test_process, [next(count_iterator)]))
        except (StopIteration, TypeError):  # no more data, clear out the slice iterator
            count_iterator = None
        # wait for a free worker or until all remaining workers finish
        while queue and (len(queue) >= pool._processes or not count_iterator):
            process = queue.pop(0)  # grab a process response from the top
            process.wait(0.1)  # let it breathe a little, 100ms should be enough
            if not process.ready():  # a sub-process has not finished execution
                queue.append(process)  # add it back to the queue
            else:
                # you can use process.get() to get the result if needed
                pass
    pool.close()

结果是(当然,系统之间会有所不同):

And the result is (of course, it will differ from system to system):

Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)

证明我们的生成器/迭代器仅在池中有可用插槽来进行工作以确保最小的内存使用(和/或如果迭代器最终这样做会造成I/O冲击)时才用于收集数据.您不会比这更精简了.您可以获得的唯一的(尽管是微不足道的)加速是减少等待时间(但是您的主进程随后将占用更多资源)并增加被锁定到服务器的允许的queue大小(以内存为代价).上面代码中的进程数-如果您使用while queue and (len(queue) >= pool._processes + 3 or not count_iterator):,它将加载3个以上的迭代器片,从而确保在进程结束且池中的插槽释放时的情况下减少了延迟.

Proving that our generator/iterator is used to collect data only when there's a free slot in the pool to do the work ensuring a minimal memory usage (and/or I/O pounding if your iterators ultimately do that). You won't get it much more streamlined than this. The only additional, albeit marginal, speed up you can obtain is to reduce the wait time (but your main process will then eat more resources) and to increase the allowed queue size (at the expense of memory) which is locked to the number of processes in the above code - if you use while queue and (len(queue) >= pool._processes + 3 or not count_iterator): it will load 3 more iterator slices ensuring lesser latency in situations when a process ends and a slot in the pool frees up.