泳池工人没有完成所有任务
我有一个相对简单的python多处理脚本,该脚本设置了一个工作池,这些工作池通过自定义管理器将输出追加到熊猫dataframe
.我发现的是,当我在池上调用close()/join()时,apply_async提交的所有任务并未全部完成.
I have a relatively simple python multiprocessing script that sets up a pool of workers that append output to a pandas dataframe
by way of a custom manager. What I am finding is when I call close()/join() on the pool, not all the tasks submitted by apply_async are being completed.
这是一个简化的示例,该示例提交了1000个作业,但只有一半完成,从而导致断言错误.我是否忽略了一些非常简单的东西,或者这可能是一个错误?
Here's a simplified example that submits 1000 jobs but only half complete causing an assertion error. Have I overlooked something very simple or is this perhaps a bug?
from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool
class DataFrameResults:
def __init__(self):
self.results = DataFrame(columns=("A", "B"))
def get_count(self):
return self.results["A"].count()
def register_result(self, a, b):
self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)
class MyManager(BaseManager): pass
MyManager.register('DataFrameResults', DataFrameResults)
def f1(results, a, b):
results.register_result(a, b)
def main():
manager = MyManager()
manager.start()
results = manager.DataFrameResults()
pool = Pool(processes=4)
for (i) in range(0, 1000):
pool.apply_async(f1, [results, i, i*i])
pool.close()
pool.join()
print results.get_count()
assert results.get_count() == 1000
if __name__ == "__main__":
main()
您看到的问题是由于以下代码引起的:
The issue which you're seeing is because of this code:
self.results = self.results.append(...)
这不是原子的.因此,在某些情况下,线程将在读取self.results
之后(或在追加时)被中断,但是在将新帧分配给self.results
->之前,该实例将丢失.
this isn't atomic. So in some cases, the thread will be interrupted after reading self.results
(or while appending) but before it can assign the new frame to self.results
-> this instance will be lost.
正确的解决方案是等待使用结果对象获取结果,然后将所有结果附加到主线程中.
The correct solution is to wait use the results objects to get the results and then append all of them in the main thread.