# -*- coding: utf-8 -*-
import asyncio
from .assertions import isiter
from .concurrent import ConcurrentExecutor, collect
@asyncio.coroutine
[docs]def gather(*coros_or_futures, limit=0, loop=None, timeout=None,
preserve_order=False, return_exceptions=False):
"""
Return a future aggregating results from the given coroutine objects
with a concurrency execution limit.
If all the tasks are done successfully, the returned future’s result is
the list of results (in the order of the original sequence,
not necessarily the order of results arrival).
If return_exceptions is `True`, exceptions in the tasks are treated the
same as successful results, and gathered in the result list; otherwise,
the first raised exception will be immediately propagated to the
returned future.
All futures must share the same event loop.
This functions is mostly compatible with Python standard
``asyncio.gather``, but providing ordered results and concurrency control
flow.
This function is a coroutine.
Arguments:
*coros_or_futures (coroutines|list): an iterable collection yielding
coroutines functions or futures.
limit (int): max concurrency limit. Use ``0`` for no limit.
timeout can be used to control the maximum number
of seconds to wait before returning. timeout can be an int or
float. If timeout is not specified or None, there is no limit to
the wait time.
preserve_order (bool): preserves results order.
return_exceptions (bool): returns exceptions as valid results.
loop (asyncio.BaseEventLoop): optional event loop to use.
Returns:
list: coroutines returned results.
Usage::
async def sum(x, y):
return x + y
await paco.gather(
sum(1, 2),
sum(None, 'str'),
return_exceptions=True)
# => [3, TypeError("unsupported operand type(s) for +: 'NoneType' and 'str'")] # noqa
"""
# If no coroutines to schedule, return empty list (as Python stdlib)
if len(coros_or_futures) == 0:
return []
# Support iterable as first argument for better interoperability
if len(coros_or_futures) == 1 and isiter(coros_or_futures[0]):
coros_or_futures = coros_or_futures[0]
# Pre-initialize results
results = [None] * len(coros_or_futures) if preserve_order else []
# Create concurrent executor
pool = ConcurrentExecutor(limit=limit, loop=loop)
# Iterate and attach coroutine for defer scheduling
for index, coro in enumerate(coros_or_futures):
# Validate coroutine object
if asyncio.iscoroutinefunction(coro):
coro = coro()
if not asyncio.iscoroutine(coro):
raise TypeError('only coroutines or coroutine functions allowed')
# Add coroutine to the executor pool
pool.add(collect(coro, index, results,
preserve_order=preserve_order,
return_exceptions=return_exceptions))
# Wait until all the tasks finishes
yield from pool.run(timeout=timeout, return_exceptions=return_exceptions)
# Returns aggregated results
return results