Source code for paco.filter

# -*- coding: utf-8 -*-
import asyncio
from .decorator import overload
from .concurrent import ConcurrentExecutor
from .assertions import assert_corofunction, assert_iter

def assert_true(element):
    Asserts that a given coroutine yields a true-like value.

        element (mixed): element to evaluate.

    return element

[docs]def filter(coro, iterable, assert_fn=None, limit=0, loop=None): """ Returns a list of all the values in coll which pass an asynchronous truth test coroutine. Operations are executed concurrently by default, but results will be in order. You can configure the concurrency via `limit` param. This function is the asynchronous equivalent port Python built-in `filter()` function. This function is a coroutine. This function can be composed in a pipeline chain with ``|`` operator. Arguments: coro (coroutine function): coroutine filter function to call accepting iterable values. iterable (iterable): an iterable collection yielding coroutines functions. assert_fn (coroutinefunction): optional assertion function. limit (int): max filtering concurrency limit. Use ``0`` for no limit. loop (asyncio.BaseEventLoop): optional event loop to use. Raises: TypeError: if coro argument is not a coroutine function. Returns: list: ordered list containing values that passed the filter. Usage:: async def iseven(num): return num % 2 == 0 await paco.filter(coro, [1, 2, 3, 4, 5], limit=3) => [2, 4] """ assert_corofunction(coro=coro) assert_iter(iterable=iterable) # Check valid or empty iterable if len(iterable) == 0: return iterable # Reduced accumulator value results = [None] * len(iterable) # Use a custom or default filter assertion function assert_fn = assert_fn or assert_true # Create concurrent executor pool = ConcurrentExecutor(limit=limit, loop=loop) # Reducer partial function for deferred coroutine execution def filterer(index, element): @asyncio.coroutine def wrapper(): result = yield from coro(element) if (yield from assert_fn(result)): results[index] = element return wrapper # Iterate and attach coroutine for defer scheduling for index, element in enumerate(iterable): pool.add(filterer(index, element)) # Wait until all coroutines finish yield from # Returns filtered elements return [x for x in results if x is not None]