Python并行化“异步"

问题描述:

我的龙卷风处理程序中具有以下方法:

I have the following method in my Tornado handler:

  async def get(self):
      url = 'url here'
      try:
          async for batch in downloader.fetch(url):
              self.write(batch)
              await self.flush()
      except Exception as e:
          logger.warning(e)

这是downloader.fetch()的代码:

This is the code for downloader.fetch():

async def fetch(url, **kwargs):
    timeout = kwargs.get('timeout', aiohttp.ClientTimeout(total=12))
    response_validator = kwargs.get('response_validator', json_response_validator)
    extractor = kwargs.get('extractor', json_extractor)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as resp:
                response_validator(resp)
                async for batch in extractor(resp):
                    yield batch

    except aiohttp.client_exceptions.ClientConnectorError:
        logger.warning("bad request")
        raise
    except asyncio.TimeoutError:
        logger.warning("server timeout")
        raise

我想从多个并行下载器中产生批处理"对象. 我希望从第一个下载器获得第一个可用批次,以此类推,直到所有下载器完成.这样的东西(这不是工作代码):

I would like yield the "batch" object from multiple downloaders in paralel. I want the first available batch from the first downloader and so on until all downloaders finished. Something like this (this is not working code):

async for batch in [downloader.fetch(url1), downloader.fetch(url2)]:
    ....

这可能吗?我该如何修改自己的工作方式,以便能够并行从多个协程中获得收益?

Is this possible? How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?

我如何修改自己的操作以便能够并行地从多个协程中获得收益?

How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?

您需要一个函数,将两个异步序列合并为一个,并同时进行迭代,并在一个可用的情况下从另一个中产生元素.尽管当前标准库中未包含此类功能,但您可以 aiostream软件包中找到一个.

You need a function that merges two async sequences into one, iterating over both in parallel and yielding elements from one or the other, as they become available. While such a function is not included in the current standard library, you can find one in the aiostream package.

您还可以编写自己的merge函数,如此答案所示:

You can also write your own merge function, as shown in this answer:

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

使用该函数,循环如下所示:

Using that function, the loop would look like this:

async for batch in merge(downloader.fetch(url1), downloader.fetch(url2)):
    ....