Source code for paco.race

# -*- coding: utf-8 -*-
import asyncio
from .assertions import assert_iter


@asyncio.coroutine
[docs]def race(iterable, loop=None, timeout=None, *args, **kw): """ Runs the tasks array of functions concurrently, without waiting until the previous function has completed. Once any of the tasks completes, the coroutine main callback is immediately called. This function implements the same interface as Python standard `itertools.dropwhile()` function. All coroutines will be executed in the same loop. Arguments: iterable (iterable): an iterable collection yielding coroutines functions or coroutine objects. *args (mixed): mixed variadic arguments to pass to coroutines. loop (asyncio.BaseEventLoop): optional event loop to use. timeout (int|float): timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time. Raises: TypeError: if ``iterable`` argument is not iterable. asyncio.TimoutError: if wait timeout is exceeded. Returns: filtered values (list): ordered list of resultant values. Usage:: await paco.race(coro, coro2, coro3) => coro2 # result """ assert_iter(iterable=iterable) # Store coros and internal state coros = [] resolved = False result = None # Resolve first yielded data from coroutine and stop pending ones @asyncio.coroutine def resolver(index, coro): nonlocal result nonlocal resolved value = yield from coro if not resolved: resolved = True # Flag as not test passed result = value # Force canceling pending coroutines for _index, future in enumerate(coros): if _index != index: future.cancel() # Iterate and attach coroutine for defer scheduling for index, coro in enumerate(iterable): # Validate yielded object isfunction = asyncio.iscoroutinefunction(coro) if not isfunction and not asyncio.iscoroutine(coro): raise TypeError('coro must be a coroutine or coroutine function') # Init coroutine function, if required if isfunction: coro = coro(*args, **kw) # Store future tasks coros.append(asyncio.async(resolver(index, coro))) # Run coroutines concurrently yield from asyncio.wait(coros, timeout=timeout, loop=loop) return result