API documentation

paco.map(coro, iterable, limit=0, loop=None, timeout=None, return_exceptions=False, *args, **kw)[source]

Concurrently maps values yielded from an iterable, passing then into an asynchronous coroutine function.

Mapped values will be returned as list. Items order will be preserved based on origin iterable order.

Concurrency level can be configurable via limit param.

This function is the asynchronous equivalent port Python built-in map() function.

This function is a coroutine.

Parameters:
  • coro (coroutinefunction) – map coroutine function to use.
  • iterable (iter) – an iterable collection yielding coroutines functions. Asynchronous iterables are not supported.
  • limit (int) – max concurrency limit. Use 0 for no limit.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
  • timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
  • return_exceptions (bool) – returns exceptions as valid results.
  • *args (mixed) – optional variadic arguments to be passed to the coroutine map function.
Returns:

ordered list of values yielded by coroutines

Return type:

list

Usage:

await paco.map(mul2, [1, 2, 3, 4, 5], limit=3)
[2, 4, 6, 8, 10]
paco.run(coro, loop=None)[source]

Convenient shortcut alias to loop.run_until_complete.

Parameters:
  • coro (coroutine) – coroutine object to schedule.
  • loop (asyncio.BaseEventLoop) – optional event loop to use. Defaults to: asyncio.get_event_loop().
Returns:

returned value by coroutine.

Return type:

mixed

paco.each(coro, iterable, limit=0, loop=None, collect=False, timeout=None, return_exceptions=False, *args, **kw)[source]

Concurrently iterates values yielded from an iterable, passing them to an asynchronous coroutine.

You can optionally collect yielded values passing collect=True param, which would be equivalent to paco.map()`.

Mapped values will be returned as an ordered list. Items order is preserved based on origin iterable order.

Concurrency level can be configurable via limit param.

All coroutines will be executed in the same loop.

This function is a coroutine.

Parameters:
  • coro (coroutinefunction) – coroutine iterator function that accepts iterable values.
  • iterable (iter) – an iterable collection yielding coroutines functions. Asynchronous iterables are not supported.
  • limit (int) – max iteration concurrency limit. Use 0 for no limit.
  • collect (bool) – return yielded values from coroutines. Default False.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
  • return_exceptions (bool) – enable/disable returning exceptions in case of error. collect param must be True.
  • timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
  • *args (mixed) – optional variadic arguments to pass to the coroutine iterable function.
Returns:

results – ordered list of values yielded by coroutines

Return type:

list

Raises:

TypeError – in case of invalid input arguments.

Usage:

async def mul2(num):
    return mul * 2

await paco.each(mul2, [1, 2, 3, 4, 5], limit=3)
=> [2, 4, 6, 8, 10]
paco.some(coro, iterable, limit=0, timeout=None, loop=None)[source]

Returns True if at least one element in the iterable satisfies the asynchronous coroutine test. If any iteratee call returns True, iteration stops and True will be returned.

This function is a coroutine.

Parameters:
  • coro (coroutine function) – coroutine function for test values.
  • iterable (iterable) – an iterable.
  • limit (int) – max concurrency limit. Use 0 for no limit.
  • can be used to control the maximum number (timeout) – of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if input arguments are not valid.

Returns:

True if at least on value passes the test, otherwise False.

Return type:

bool

Usage:

async def test(num):
    return num > 3

await paco.some(test, [1, 2, 3, 4, 5])
=> True
paco.race(iterable, loop=None, timeout=None, *args, **kw)[source]

Runs the tasks array of functions concurrently, without waiting until the previous function has completed.

Once any of the tasks completes, the coroutine main callback is immediately called.

This function implements the same interface as Python standard itertools.dropwhile() function.

All coroutines will be executed in the same loop.

Parameters:
  • iterable (iterable) – an iterable collection yielding coroutines functions or coroutine objects.
  • *args (mixed) – mixed variadic arguments to pass to coroutines.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
  • timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
Raises:
  • TypeError – if iterable argument is not iterable.
  • asyncio.TimoutError – if wait timeout is exceeded.
Returns:

filtered values – ordered list of resultant values.

Return type:

list

Usage:

await paco.race(coro)
paco.once(*args, **kw)
paco.wait(*coros_or_futures, limit=0, timeout=None, loop=None, return_exceptions=False, return_when='ALL_COMPLETED')[source]

Wait for the Futures and coroutine objects given by the sequence futures to complete, with optional concurrency limit. Coroutines will be wrapped in Tasks.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

return_when indicates when this function should return. It must be one of the following constants of the concurrent.futures module.

All futures must share the same event loop.

This functions is mostly compatible with Python standard asyncio.wait().

Parameters:
  • *coros_or_futures (iter|list) – an iterable collection yielding coroutines functions.
  • limit (int) – optional concurrency execution limit. Use 0 for no limit.
  • timeout (int/float) – maximum number of seconds to wait before returning.
  • return_exceptions (bool) – exceptions in the tasks are treated the same as successful results, instead of raising them.
  • return_when (str) – indicates when this function should return.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
  • *args (mixed) – optional variadic argument to pass to the coroutines function.
Returns:

Returns two sets of Future: (done, pending).

Return type:

tuple

Raises:
  • TypeError – in case of invalid coroutine object.
  • ValueError – in case of empty set of coroutines or futures.
  • TimeoutError – if execution takes more than expected.

Usage:

done, pending = await paco.wait(
  task(1, foo='bar'),
  task(2, foo='bar'),
  task(3, foo='bar'),
  task(4, foo='bar'),
  limit=2, return_exceptions=True)
paco.wraps(fn)[source]

Wraps a given function as coroutine function.

This is a convenient helper function.

Parameters:fn (function) – function object to wrap.
Returns:coroutinefunction.

Usage:

def mult(num, foo=None):
    return num * 2

coro = paco.wrap(mult)
await coro(2, foo='bar')
=> 4
paco.defer(*args, **kw)
paco.apply(*args, **kw)
paco.every(coro, iterable, limit=1, loop=None)[source]

Returns True if every element in a given iterable satisfies the coroutine asynchronous test.

If any iteratee coroutine call returns False, the process is inmediately stopped, and False will be returned.

You can increase the concurrency limit for a fast race condition scenario.

This function is a coroutine.

Parameters:
  • coro (coroutine function) – coroutine function to call with values to reduce.
  • iterable (iterable) – an iterable collection yielding coroutines functions.
  • limit (int) – max concurrency execution limit. Use 0 for no limit.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if input arguments are not valid.

Returns:

True if all the values passes the test, otherwise False.

Return type:

bool

Usage:

async def test(num):
    return num > 10

await paco.some(test, [1, 2, 3, 4, 5])
=> True
paco.until(coro, coro_test, assert_coro=None, *args, **kw)[source]

Repeatedly call coro coroutine function until coro_test returns True.

This function is the inverse of paco.whilst().

This function is a coroutine.

Parameters:
  • coro (coroutinefunction) – coroutine function to execute.
  • coro_test (coroutinefunction) – coroutine function to test.
  • assert_coro (coroutinefunction) – optional assertion coroutine used to determine if the test passed or not.
  • *args (mixed) – optional variadic arguments to pass to coro function.
Raises:

TypeError – if input arguments are invalid.

Returns:

result values returned by coro.

Return type:

list

Usage:

await paco.until(coro, [1, 2, 3, 4, 5])
paco.times(*args, **kw)
paco.gather(*coros_or_futures, limit=0, loop=None, timeout=None, preserve_order=False, return_exceptions=False)[source]

Return a future aggregating results from the given coroutine objects with a concurrency execution limit.

If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival).

If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

All futures must share the same event loop.

This functions is mostly compatible with Python standard asyncio.gather, but providing ordered results and concurrency control flow.

This function is a coroutine.

Parameters:
  • *coros_or_futures (coroutines|list) – an iterable collection yielding coroutines functions or futures.
  • limit (int) – max concurrency limit. Use 0 for no limit.
  • can be used to control the maximum number (timeout) – of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
  • preserve_order (bool) – preserves results order.
  • return_exceptions (bool) – returns exceptions as valid results.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Returns:

coroutines returned results.

Return type:

list

Usage:

await paco.gather(
  task(1, foo='bar'),
  task(2, foo='bar'),
  task(3, foo='bar'),
  task(4, foo='bar'),
  limit=2, return_exceptions=True)
paco.repeat(coro, times=1, step=1, limit=1, loop=None)[source]

Executes the coroutine function x number of times, and accumulates results in order as you would use with map.

Execution concurrency is configurable using limit param.

This function is a coroutine.

Parameters:
  • coro (coroutinefunction) – coroutine function to schedule.
  • times (int) – number of times to execute the coroutine.
  • step (int) – increment iteration step, as with range().
  • limit (int) – concurrency execution limit. Defaults to 10.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if coro is not a coroutine function.

Returns:

accumulated yielded values returned by coroutine.

Return type:

list

Usage:

async def task(num):
    return num * 2

await paco.times(task, 10, limit=2)
=> [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
paco.reduce(coro, iterable, right=False, initializer=None, loop=None)[source]

Apply function of two arguments cumulatively to the items of sequence, from left to right, so as to reduce the sequence to a single value.

Reduction will be executed sequentially without concurrency, so passed values would be in order.

This function is the asynchronous coroutine equivalent to Python standard functools.reduce() function.

This function is a coroutine.

Parameters:
  • coro (coroutine function) – coroutine function to call with values to reduce.
  • iterable (iterable) – an iterable collection yielding coroutines functions.
  • right (bool) – reduce iterable from right to left.
  • initializer (mixed) – initial accumulator value used in the first reduction call.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if input arguments are not valid.

Returns:

accumulated final reduced value.

Return type:

mixed

Usage:

async def reducer(acc, num):
   return acc + num

await paco.reduce(reducer, [1, 2, 3, 4, 5], initializer=0)
=> 10
paco.filter(coro, iterable, assert_fn=None, limit=0, loop=None)[source]

Returns a list of all the values in coll which pass an asynchronous truth test coroutine.

Operations are executed concurrently by default, but results will be in order.

You can configure the concurrency via limit param.

This function is the asynchronous equivalent port Python built-in filter() function.

This function is a coroutine.

Parameters:
  • coro (coroutine function) – coroutine filter function to call accepting iterable values.
  • iterable (iterable) – an iterable collection yielding coroutines functions.
  • assert_fn (coroutinefunction) – optional assertion function.
  • limit (int) – max filtering concurrency limit. Use 0 for no limit.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if coro argument is not a coroutine function.

Returns:

ordered list containing values that passed the filter.

Return type:

list

Usage:

async def iseven(num):
    return num % 2 == 0

await paco.filter(coro, [1, 2, 3, 4, 5], limit=3)
=> [2, 4]
paco.whilst(coro, coro_test, assert_coro=None, *args, **kw)[source]

Repeatedly call coro coroutine function while coro_test returns True.

This function is the inverse of paco.until().

This function is a coroutine.

Parameters:
  • coro (coroutinefunction) – coroutine function to execute.
  • coro_test (coroutinefunction) – coroutine function to test.
  • assert_coro (coroutinefunction) – optional assertion coroutine used to determine if the test passed or not.
  • *args (mixed) – optional variadic arguments to pass to coro function.
Raises:

TypeError – if input arguments are invalid.

Returns:

result values returned by coro.

Return type:

list

Usage:

await paco.whilst(coro, [1, 2, 3, 4, 5])
paco.series(*coros_or_futures, timeout=None, loop=None, return_exceptions=False)[source]

Run the given coroutine functions in series, each one running once the previous execution has completed.

If any coroutines raises an exception, no more coroutines are executed. Otherwise, the coroutines returned values will be returned as list.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

All futures must share the same event loop.

This functions is mostly compatible with Python standard asyncio.wait().

This function is a coroutine.

Parameters:
  • *coros_or_futures (iter|list) – an iterable collection yielding coroutines functions.
  • timeout (int/float) – maximum number of seconds to wait before returning.
  • return_exceptions (bool) – exceptions in the tasks are treated the same as successful results, instead of raising them.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
  • *args (mixed) – optional variadic argument to pass to the coroutines function.
Returns:

coroutines returned results.

Return type:

list

Raises:
  • TypeError – in case of invalid coroutine object.
  • ValueError – in case of empty set of coroutines or futures.
  • TimeoutError – if execution takes more than expected.

Usage:

results = await paco.series(
  task(1, foo='bar'),
  task(2, foo='bar'),
  task(3, foo='bar'),
  task(4, foo='bar'),
  return_exceptions=True)
paco.partial(*args, **kw)
paco.timeout(*args, **kw)
paco.compose(*coros)[source]

Creates a coroutine function based on the composition of the passed coroutine functions.

Each function consumes the yielded result of the coroutine that follows.

Composing coroutine functions f(), g(), and h() would produce the result of f(g(h())).

Parameters:*coros (coroutinefunction) – variadic coroutine functions to compose.
Raises:RuntimeError – if cannot execute a coroutine function.
Returns:coroutinefunction

Usage:

coro = paco.compose(sum1, mul2, sum1)
await coro(1)
=> 5
paco.throttle(*args, **kw)
paco.constant(value, delay=None)[source]

Returns a coroutine function that when called, always returns the provided value.

Parameters:
  • value (mixed) – value to constantly return when coroutine is called.
  • delay (int/float) – optional return value delay in seconds.
Returns:

coroutinefunction

Usage:

coro = paco.constant('foo')
await coro()
'foo'
paco.dropwhile(coro, iterable, loop=None)[source]

Make an iterator that drops elements from the iterable as long as the predicate is true; afterwards, returns every element.

Note, the iterator does not produce any output until the predicate first becomes false, so it may have a lengthy start-up time.

This function is pretty much equivalent to Python standard itertools.dropwhile(), but designed to be used with async coroutines.

This function is a coroutine.

Parameters:
  • coro (coroutine function) – coroutine function to call with values to reduce.
  • iterable (iterable) – an iterable collection yielding coroutines functions.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if coro argument is not a coroutine function.

Returns:

filtered values – ordered list of resultant values.

Return type:

list

Usage:

async def filter(num):
    return num < 4

await paco.dropwhile(filter, [1, 2, 3, 4, 5, 1])
=> [4, 5, 1]
paco.filterfalse(coro, iterable, limit=0, loop=None)[source]

Returns a list of all the values in coll which pass an asynchronous truth test coroutine.

Operations are executed concurrently by default, but results will be in order.

You can configure the concurrency via limit param.

This function is the asynchronous equivalent port Python built-in filterfalse() function.

This function is a coroutine.

Parameters:
  • coro (coroutine function) – coroutine filter function to call accepting iterable values.
  • iterable (iterable) – an iterable collection yielding coroutines functions.
  • assert_fn (coroutinefunction) – optional assertion function.
  • limit (int) – max filtering concurrency limit. Use 0 for no limit.
  • loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises:

TypeError – if coro argument is not a coroutine function.

Returns:

filtered values – ordered list containing values that do not

passed the filter.

Return type:

list

Usage:

await paco.filterfalse(coro, [1, 2, 3, 4, 5], limit=3)
paco.concurrent

alias of ConcurrentExecutor

class paco.ConcurrentExecutor(limit=10, loop=None, coros=None)[source]

Bases: object

Concurrent executes a set of asynchronous coroutines with a simple throttle concurrency configurable concurrency limit.

Provides an observer pub/sub interface, allowing API consumers to subscribe normal functions or coroutines to certain events that happen internally.

ConcurrentExecutor is a low-level implementation that powers most of the utility functions provided in paco.

For most cases you won’t need to rely on it, instead you can use the high-level API functions that provides a simpler abstraction for the majority of the use cases.

This class is not thread safe.

Events:
  • start (executor): triggered before executor cycle starts.
  • finish (executor): triggered when all the coroutine finished.
  • task.start (task): triggered before coroutine starts.
  • task.finish (task, result): triggered when the coroutine finished.

Usage:

pool = ConcurrentExecutor(3)
pool.add(coroutine, 'foo', 1)
pool.add(coroutine, 'foo', 1)
pool.add(coroutine, 'foo', 1)
await pool.run(return_exceptions=True)
add(coro, *args, **kw)[source]

Adds a new coroutine function with optional variadic argumetns.

Parameters:
  • coro (coroutine function) – coroutine to execute.
  • *args (mixed) – optional variadic arguments
Raises:

TypeError – if the coro object is not a valid coroutine

Returns:

coroutine wrapped future

Return type:

future

cancel()[source]

Tries to gracefully cancel the pending coroutine scheduled coroutine tasks.

extend(*coros)[source]

Add multiple coroutines to the executor pool.

Raises:TypeError – if the coro object is not a valid coroutine
is_running()[source]

Checks the executor running state.

Returns:True if the executur is running, otherwise False.
Return type:bool
off(event)[source]

Removes event subscribers.

Parameters:event (str) – event name to remove observers.
on(event, fn)[source]

Subscribes to a specific event.

Parameters:
  • event (str) – event name to subcribe.
  • fn (function) – function to trigger.
reset()[source]

Resets the executer scheduler internal state.

Raises:RuntimeError – is the executor is still running.
run(timeout=None, return_exceptions=None, return_when='ALL_COMPLETED')[source]

Executes the registered coroutines in the executor queue.

Parameters:

timeout (int/float) – max execution timeout. No limit by default.

Returns:

asyncio.Future – two sets of Futures: (done, pending)

Return type:

tuple

Raises:
  • ValueError – if there is no coroutines to schedule.
  • RuntimeError – if executor is still running.
  • TimeoutError – if execution takes more than expected.
submit(coro, *args, **kw)

Adds a new coroutine function with optional variadic argumetns.

Parameters:
  • coro (coroutine function) – coroutine to execute.
  • *args (mixed) – optional variadic arguments
Raises:

TypeError – if the coro object is not a valid coroutine

Returns:

coroutine wrapped future

Return type:

future

wait(timeout=None, return_exceptions=None, return_when='ALL_COMPLETED')

Executes the registered coroutines in the executor queue.

Parameters:

timeout (int/float) – max execution timeout. No limit by default.

Returns:

asyncio.Future – two sets of Futures: (done, pending)

Return type:

tuple

Raises:
  • ValueError – if there is no coroutines to schedule.
  • RuntimeError – if executor is still running.
  • TimeoutError – if execution takes more than expected.