Python并行化“异步"
我的龙卷风处理程序中具有以下方法:
I have the following method in my Tornado handler:
async def get(self):
url = 'url here'
try:
async for batch in downloader.fetch(url):
self.write(batch)
await self.flush()
except Exception as e:
logger.warning(e)
这是downloader.fetch()的代码:
This is the code for downloader.fetch():
async def fetch(url, **kwargs):
timeout = kwargs.get('timeout', aiohttp.ClientTimeout(total=12))
response_validator = kwargs.get('response_validator', json_response_validator)
extractor = kwargs.get('extractor', json_extractor)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
response_validator(resp)
async for batch in extractor(resp):
yield batch
except aiohttp.client_exceptions.ClientConnectorError:
logger.warning("bad request")
raise
except asyncio.TimeoutError:
logger.warning("server timeout")
raise
我想从多个并行下载器中产生批处理"对象. 我希望从第一个下载器获得第一个可用批次,以此类推,直到所有下载器完成.这样的东西(这不是工作代码):
I would like yield the "batch" object from multiple downloaders in paralel. I want the first available batch from the first downloader and so on until all downloaders finished. Something like this (this is not working code):
async for batch in [downloader.fetch(url1), downloader.fetch(url2)]:
....
这可能吗?我该如何修改自己的工作方式,以便能够并行从多个协程中获得收益?
Is this possible? How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?
我如何修改自己的操作以便能够并行地从多个协程中获得收益?
How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?
您需要一个函数,将两个异步序列合并为一个,并同时进行迭代,并在一个可用的情况下从另一个中产生元素.尽管当前标准库中未包含此类功能,但您可以 aiostream软件包中找到一个.
You need a function that merges two async sequences into one, iterating over both in parallel and yielding elements from one or the other, as they become available. While such a function is not included in the current standard library, you can find one in the aiostream package.
您还可以编写自己的merge
函数,如此答案所示:
You can also write your own merge
function, as shown in this answer:
async def merge(*iterables):
iter_next = {it.__aiter__(): None for it in iterables}
while iter_next:
for it, it_next in iter_next.items():
if it_next is None:
fut = asyncio.ensure_future(it.__anext__())
fut._orig_iter = it
iter_next[it] = fut
done, _ = await asyncio.wait(iter_next.values(),
return_when=asyncio.FIRST_COMPLETED)
for fut in done:
iter_next[fut._orig_iter] = None
try:
ret = fut.result()
except StopAsyncIteration:
del iter_next[fut._orig_iter]
continue
yield ret
使用该函数,循环如下所示:
Using that function, the loop would look like this:
async for batch in merge(downloader.fetch(url1), downloader.fetch(url2)):
....