Source code for paco.some

# -*- coding: utf-8 -*-
import asyncio
from .partial import partial
from .decorator import overload
from .concurrent import ConcurrentExecutor
from .assertions import assert_corofunction, assert_iter


@overload
@asyncio.coroutine
[docs]def some(coro, iterable, limit=0, timeout=None, loop=None): """ Returns `True` if at least one element in the iterable satisfies the asynchronous coroutine test. If any iteratee call returns `True`, iteration stops and `True` will be returned. This function is a coroutine. This function can be composed in a pipeline chain with ``|`` operator. Arguments: coro (coroutine function): coroutine function for test values. iterable (iterable): an iterable. limit (int): max concurrency limit. Use ``0`` for no limit. timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time. loop (asyncio.BaseEventLoop): optional event loop to use. Raises: TypeError: if input arguments are not valid. Returns: bool: `True` if at least on value passes the test, otherwise `False`. Usage:: async def test(num): return num > 3 await paco.some(test, [1, 2, 3, 4, 5]) => True """ assert_corofunction(coro=coro) assert_iter(iterable=iterable) # Reduced accumulator value passes = False # If no items in iterable, return False if len(iterable) == 0: return passes # Create concurrent executor pool = ConcurrentExecutor(limit=limit, loop=loop) # Reducer partial function for deferred coroutine execution @asyncio.coroutine def tester(element): nonlocal passes if passes: return None if (yield from coro(element)): # Flag as not test passed passes = True # Force stop pending coroutines pool.cancel() # Iterate and attach coroutine for defer scheduling for element in iterable: pool.add(partial(tester, element)) # Wait until all coroutines finish yield from pool.run(timeout=timeout) return passes