使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成

问题描述:

我有以下代码:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

我运行这个函数直到完成.设置关闭时会出现问题 - 函数完成并且任何挂起的任务都不会运行.

I run this function until complete. The problem occurs when shutdown is set - the function completes and any pending tasks are never run.

这是错误:

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

如何正确安排关机时间?

How do I schedule a shutdown correctly?

为了提供一些上下文,我正在编写一个系统监视器,它每 5 秒从/proc/stat 读取一次,计算该期间的 CPU 使用率,然后将结果发送到服务器.我想一直调度这些监控作业,直到我收到 sigterm,当我停止调度时,等待所有当前作业完成,然后优雅地退出.

To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.

您可以检索未完成的任务并再次运行循环直到它们完成,然后关闭循环或退出您的程序.

You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.

pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))

  • pending 是待处理任务的列表.
  • asyncio.gather() 允许同时等待多个任务.
    • pending is a list of pending tasks.
    • asyncio.gather() allows to wait on several tasks at once.
    • 如果你想确保所有的任务都在一个协程内完成(也许你有一个主"协程),你可以这样做,例如:

      If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:

async def do_something_periodically():
    while True:
        asyncio.create_task(my_expensive_operation())
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await asyncio.gather(*asyncio.all_tasks())

此外,在这种情况下,由于所有任务都是在同一个协程中创建的,因此您已经可以访问这些任务:

Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:

async def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.create_task(my_expensive_operation()))
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await asyncio.gather(*tasks)