Source code for paco.concurrent

# -*- coding: utf-8 -*-
"""Coroutines concurrent pool executor with built-in
concurrency limit based on a semaphore free slots algorithm.

Usage::

    async def fetch(url):
        r = await aiohttp.get(url)
        return await r.read()
    # limit the concurrent coroutines to 3
    pool = concurrent(3)
    for _ in range(10):
        p.submit(fetch, 'http://www.baidu.com')
    await p.join()
"""
import asyncio
from collections import deque, namedtuple
from .observer import Observer
from .assertions import isiter

# Task represents an immutable tuple storing the index order
# and coroutine object.
Task = namedtuple('Task', ['index', 'coro'])


@asyncio.coroutine
def safe_run(coro, return_exceptions=False):
    """
    Executes a given coroutine and optionally catches exceptions, returning
    them as value. This function is intended to be used internally.
    """
    try:
        result = yield from coro
    except Exception as err:
        if return_exceptions:
            result = err
        else:
            raise err
    return result


@asyncio.coroutine
def collect(coro, index, results,
            preserve_order=False,
            return_exceptions=False):
    """
    Collect is used internally to execute coroutines and collect the returned
    value. This function is intended to be used internally.
    """
    result = yield from safe_run(coro, return_exceptions=return_exceptions)

    if preserve_order:
        results[index] = result
    else:
        results.append(result)


[docs]class ConcurrentExecutor(object): """ Concurrent executes a set of asynchronous coroutines with a simple throttle concurrency configurable concurrency limit. Provides an observer pub/sub interface, allowing API consumers to subscribe normal functions or coroutines to certain events that happen internally. ConcurrentExecutor is_running a low-level implementation that powers most of the utility functions provided in `paco`. For most cases you won't need to rely on it, instead you can use the high-level API functions that provides a simpler abstraction for the majority of the use cases. This class is not thread safe. Events: - start (executor): triggered before executor cycle starts. - finish (executor): triggered when all the coroutine finished. - task.start (task): triggered before coroutine starts. - task.finish (task, result): triggered when the coroutine finished. - task.error (task, error): triggered when a coroutined task raised an exception. Arguments: limit (int): concurrency limit. Defaults to 10. coros (list[coroutine], optional): list of coroutines to schedule. loop (asyncio.BaseEventLoop, optional): loop to run. Defaults to asyncio.get_event_loop(). ignore_empty (bool, optional): do not raise an exception if there are no coroutines to schedule are empty. Returns: ConcurrentExecutor Usage:: async def sum(x, y): return x + y pool = paco.ConcurrentExecutor(limit=2) pool.add(sum, 1, 2) pool.add(sum, None, 'str') done, pending = await pool.run(return_exceptions=True) [task.result() for task in done] # => [3, TypeError("unsupported operand type(s) for +: 'NoneType' and 'str'")] # noqa """ def __init__(self, limit=10, loop=None, coros=None, ignore_empty=False): self.errors = [] self.running = False self.return_exceptions = False self.limit = max(int(limit), 0) self.pool = deque() self.observer = Observer() self.ignore_empty = ignore_empty self.loop = loop or asyncio.get_event_loop() self.semaphore = asyncio.Semaphore(self.limit, loop=self.loop) # Register coroutines in the pool if isiter(coros): self.extend(*coros)
[docs] def __len__(self): """ Returns the current length of the coroutines pool queue. Returns: int: current coroutines pool length. """ return len(self.pool)
[docs] def reset(self): """ Resets the executer scheduler internal state. Raises: RuntimeError: is the executor is still running. """ if self.running: raise RuntimeError('paco: executor is still running') self.pool.clear() self.observer.clear() self.semaphore = asyncio.Semaphore(self.limit, loop=self.loop)
[docs] def cancel(self): """ Tries to gracefully cancel the pending coroutine scheduled coroutine tasks. """ self.pool.clear() self.running = False
[docs] def on(self, event, fn): """ Subscribes to a specific event. Arguments: event (str): event name to subcribe. fn (function): function to trigger. """ return self.observer.on(event, fn)
[docs] def off(self, event): """ Removes event subscribers. Arguments: event (str): event name to remove observers. """ return self.observer.off(event)
[docs] def extend(self, *coros): """ Add multiple coroutines to the executor pool. Raises: TypeError: if the coro object is not a valid coroutine """ for coro in coros: self.add(coro)
[docs] def add(self, coro, *args, **kw): """ Adds a new coroutine function with optional variadic argumetns. Arguments: coro (coroutine function): coroutine to execute. *args (mixed): optional variadic arguments Raises: TypeError: if the coro object is not a valid coroutine Returns: future: coroutine wrapped future """ # Create coroutine object if a function is provided if asyncio.iscoroutinefunction(coro): coro = coro(*args, **kw) # Verify coroutine if not asyncio.iscoroutine(coro): raise TypeError('paco: coro must be a coroutine object') # Store coroutine with arguments for deferred execution index = max(len(self.pool), 0) task = Task(index, coro) # Append the coroutine data to the pool self.pool.append(task) return coro
# Alias to add() submit = add @asyncio.coroutine def _run_sequentially(self): # Store futures in two queues done, pending = [], [] # Run until the pool is empty while len(self.pool): future = asyncio.Future(loop=self.loop) pending.append(future) # Run coroutine result = yield from self._run_coro((self.pool.popleft())) # Assign result to future if isinstance(result, Exception): if not self.return_exceptions: raise result future.set_exception(result) else: future.set_result(result) # Swap future between queues done.append(pending.pop()) # Build futures tuple to be compatible with asyncio.wait() interface return set(done), set(pending) @asyncio.coroutine def _run_concurrently(self, timeout=None, return_when='ALL_COMPLETED'): coros = [] limit = self.limit while len(self.pool): task = self.pool.popleft() # Run without concurrency limit if limit <= 0: coros.append(self._run_coro(task)) # Otherwise, schedule for concurrent based flow else: coros.append(self._schedule_coro(task)) # Wait until all the coroutines finishes return (yield from asyncio.wait(coros, loop=self.loop, timeout=timeout, return_when=return_when)) @asyncio.coroutine def _run_coro(self, task): # Executor must be running if not self.running: return None # Trigger task pre-execution event yield from self.observer.trigger('task.start', task) # Trigger coroutine task index, coro = task # Safe coroutine execution try: result = yield from safe_run( coro, return_exceptions=self.return_exceptions) except Exception as err: self.errors.append(err) yield from self.observer.trigger('task.error', task, err) raise err # important: re-raise exception for asyncio propagation # Trigger task post-execution event yield from self.observer.trigger('task.finish', task, result) # Return result to future binding return result @asyncio.coroutine def _schedule_coro(self, task): """ Executes a given coroutine in the next available slot. Slots are available based on a simple free slots scheduling semaphore-based algorithm. """ # Run when a slot is available with (yield from self.semaphore): return (yield from self._run_coro(task))
[docs] @asyncio.coroutine def run(self, timeout=None, return_when=None, return_exceptions=None, ignore_empty=None): """ Executes the registered coroutines in the executor queue. Arguments: timeout (int/float): max execution timeout. No limit by default. return_exceptions (bool): in case of coroutine exception. return_when (str): sets when coroutine should be resolved. See `asyncio.wait`_ for supported values. ignore_empty (bool, optional): do not raise an exception if there are no coroutines to schedule are empty. Returns: asyncio.Future (tuple): two sets of Futures: ``(done, pending)`` Raises: ValueError: if there is no coroutines to schedule. RuntimeError: if executor is still running. TimeoutError: if execution takes more than expected. .. _asyncio.wait: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait # noqa """ # Only allow 1 concurrent execution if self.running: raise RuntimeError('paco: executor is already running') # Overwrite ignore empty behaviour, if explicitly defined ignore_empty = (self.ignore_empty if ignore_empty is None else ignore_empty) # Check we have coroutines to schedule if len(self.pool) == 0: # If ignore empty mode enabled, just return an empty tuple if ignore_empty: return (tuple(), tuple()) # Othwerise raise an exception raise ValueError('paco: pool of coroutines is empty') # Set executor state to running self.running = True # Configure return exceptions if return_exceptions is not None: self.return_exceptions = return_exceptions if return_exceptions is False and return_when is None: return_when = 'FIRST_EXCEPTION' if return_when is None: return_when = 'ALL_COMPLETED' # Trigger pre-execution event yield from self.observer.trigger('start', self) # Sequential coroutines execution if self.limit == 1: done, pending = yield from self._run_sequentially() # Concurrent execution based on configured limit if self.limit != 1: done, pending = yield from self._run_concurrently( timeout=timeout, return_when=return_when) # Reset internal state and queue self.running = False # Raise exception, if needed if self.return_exceptions is False and self.errors: err = self.errors[0] err.errors = self.errors[1:] raise err # Trigger pre-execution event yield from self.observer.trigger('finish', self) # Reset executor state to defaults after each execution self.reset() # Return resultant futures in two tuples return done, pending
# Idiomatic method alias to run() wait = run
[docs] def is_running(self): """ Checks the executor running state. Returns: bool: ``True`` if the executur is running, otherwise ``False``. """ return self.running
# Semantic shortcut to ConcurrentExecutor() concurrent = ConcurrentExecutor