使用队列会导致 asyncio 异常“got Future <Future pending>连接到不同的回路"
我正在尝试使用 asyncio 队列运行这个简单的代码,但会捕获异常,甚至是嵌套异常.
I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions.
我想获得一些帮助,让 asyncio 中的队列正常工作:
I would like to get some help with making queues in asyncio work correctly:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []
async def run():
for request in range(1):
await in_queue.put(request)
# each task consumes from 'input_queue' and produces to 'output_queue':
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
# tasks.append(asyncio.create_task(saver()))
print('waiting for queues...')
await in_queue.join()
# await out_queue.join()
print('all queues done')
for task in tasks:
task.cancel()
print('waiting until all tasks cancelled')
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(name):
while True:
try:
print(f"{name} started")
num = await in_queue.get()
print(f'{name} got {num}')
await asyncio.sleep(0)
# await out_queue.put(num)
except Exception as e:
print(f"{name} exception {e}")
finally:
print(f"{name} ended")
in_queue.task_done()
async def saver():
while True:
try:
print("saver started")
num = await out_queue.get()
print(f'saver got {num}')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception {e}")
finally:
out_queue.task_done()
asyncio.run(run(), debug=True)
print('Done!')
输出:
waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "temp4.py", line 46, in worker
in_queue.task_done()
File "Python37\lib\asyncio\queues.py", line 202, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "temp4.py", line 63, in <module>
asyncio.run(run(), debug=True)
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
这是基本流程,我以后想做的是在更多工作人员上运行更多请求,其中每个工作人员将把数字从 in_queue
移动到 out_queue
然后保护程序将打印 out_queue
中的数字.
This is the basic flow, what I would like to do later is run more requests on more workers where each worker will move the number from in_queue
to out_queue
and then the saver will print the numbers from out_queue
.
您的队列必须在循环内创建.您在为 asyncio.run()
创建的循环之外创建了它们,因此它们使用 events.get_event_loop()
.asyncio.run()
创建一个新循环,并且在一个循环中为队列创建的期货不能在另一个循环中使用.
Your queues must be created inside the loop. You created them outside the loop created for asyncio.run()
, so they use events.get_event_loop()
. asyncio.run()
creates a new loop, and futures created for the queue in one loop can't then be used in the other.
在您的顶级 run()
协程中创建您的队列,并将它们传递给需要它们的协程,或者使用 contextvars.ContextVar
对象(如果必须使用全局变量).
Create your queues in your top-level run()
coroutine, and either pass them to the coroutines that need them, or use contextvars.ContextVar
objects if you must use globals.
您还需要清理在任务中处理任务取消的方式.通过引发 asyncio.CancelledError取消任务code> 异常
在任务中.你可以忽略它,但如果你抓住它来做清理工作,你必须重新加注.
You also need to clean up how you handle task cancelling inside your tasks. A task is cancelled by raising a asyncio.CancelledError
exception in the task. You can ignore it, but if you catch it to do clean-up work, you must re-raise it.
您的任务代码无需重新引发即可捕获所有异常,包括CancelledError
,因此您可以阻止适当的取消.
Your task code catches all exceptions without re-raising, including CancelledError
, so you block proper cancellations.
相反,取消期间确实发生的是您调用queue.task_done()
;不要这样做,至少在您的任务被取消时不要这样做.您应该只在实际处理队列任务时调用 task_done()
,但是您的代码在等待队列任务时发生异常时调用 task_done()
出现.
Instead, what does happen during cancellation is that you call queue.task_done()
; don't do that, at least not when your task is being cancelled. You should only call task_done()
when you actually are handling a queue task, but your code calls task_done()
when an exception occurs while waiting for a queue task to appear.
如果你需要使用try...finally: in_queue.task_done()
,把它放在处理从队列接收的项目的代码块周围,并保持awaitin_queue.get()
外部 try
块.您不想将实际上并未收到的任务标记为已完成.
If you need to use try...finally: in_queue.task_done()
, put this around the block of code that handles an item received from the queue, and keep the await in_queue.get()
outside of that try
block. You don't want to mark tasks done you didn't actually receive.
最后,当你打印异常时,你想打印它们的repr()
;由于历史原因,异常的 str()
转换会产生它们的 .args
值,这对于 CancelledError
异常不是很有帮助,它具有空 .args
.在格式化字符串中使用 {e!r}
,这样你就可以看到你捕捉到了什么异常:
Finally, when you print exceptions, you want to print their repr()
; for historical reasons, the str()
conversion of exceptions produces their .args
value, which is not very helpful for CancelledError
exceptions, which have an empty .args
. Use {e!r}
in formatted strings, so you can see what exception you are catching:
worker-0 exception CancelledError()
因此,在启用 saver()
任务、在 run()
内部创建的队列和清理任务异常处理的更正代码将是:
So, corrected code, with the saver()
task enabled, the queues created inside of run()
, and task exception handling cleaned up, would be:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
async def run():
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
for request in range(1):
await in_queue.put(request)
# each task consumes from 'in_queue' and produces to 'out_queue':
tasks = []
for i in range(num_workers):
tasks.append(asyncio.create_task(
worker(in_queue, out_queue, name=f'worker-{i}')))
tasks.append(asyncio.create_task(saver(out_queue)))
await in_queue.join()
await out_queue.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(in_queue, out_queue, name):
print(f"{name} started")
try:
while True:
num = await in_queue.get()
try:
print(f'{name} got {num}')
await asyncio.sleep(0)
await out_queue.put(num)
except Exception as e:
print(f"{name} exception {e!r}")
raise
finally:
in_queue.task_done()
except asyncio.CancelledError:
print(f"{name} is being cancelled")
raise
finally:
print(f"{name} ended")
async def saver(out_queue):
print("saver started")
try:
while True:
num = await out_queue.get()
try:
print(f'saver got {num}')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception {e!r}")
raise
finally:
out_queue.task_done()
except asyncio.CancelledError:
print(f"saver is being cancelled")
raise
finally:
print(f"saver ended")
asyncio.run(run(), debug=True)
print('Done!')
打印出来
worker-0 started
worker-0 got 0
saver started
saver got 0
saver ended
done
worker-0 is being cancelled
worker-0 ended
saver is being cancelled
saver ended
Done!
如果您想使用全局变量来共享队列对象,请使用 ContextVar
对象.您仍然在 run()
中创建队列,但是如果您要启动多个循环,那么 contextvars
模块集成将负责保持队列分离:
If you want to use globals, to share queue objects, then use ContextVar
objects. You still create the queues in run()
, but if you were to start multiple loops then the contextvars
module integration will take care of keeping the queues separate:
from contextvars import ContextVar
# ...
in_queue = ContextVar('in_queue')
out_queue = ContextVar('out_queue')
async def run():
in_, out = asyncio.Queue(), asyncio.Queue()
in_queue.set(in_)
out_queue.set(out)
for request in range(1):
await in_.put(request)
# ...
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
tasks.append(asyncio.create_task(saver()))
await in_.join()
await out.join()
# ...
async def worker(name):
print(f"{name} started")
in_ = in_queue.get()
out = out_queue.get()
try:
while True:
num = await in_.get()
try:
# ...
await out.put(num)
# ...
finally:
in_.task_done()
# ...
async def saver():
print("saver started")
out = out_queue.get()
try:
while True:
num = await out.get()
try:
# ...
finally:
out.task_done()
# ...