Source code for mosaic.utils.event_loop


import uuid
import uvloop
import asyncio
import inspect
import weakref
import functools
import concurrent.futures

import mosaic
from .utils import set_main_thread


__all__ = ['EventLoop', 'Future', 'async_property', 'gather']


def awaitify(sync_func):
    """Wrap a synchronous callable to allow ``await``'ing it"""
    @functools.wraps(sync_func)
    async def async_func(*args, **kwargs):
        return sync_func(*args, **kwargs)
    return async_func


[docs] class Future: """ A local future associated with an EventLoop. Parameters ---------- name : str, optional Name to give to the future, defaults to ``anon``. loop : EventLoop, optional Loop associated with the future, defaults to current loop. """ def __init__(self, name='anon', loop=None): self._future = asyncio.Future() self._loop = loop or mosaic.get_event_loop() self._name = name self._uid = '%s-%s-%s' % ('fut', name, uuid.uuid4().hex) @property def uid(self): """ Access the UID of the future. """ return self._uid @property def state(self): """ Check the state of the future (``pending``, ``done``, ``cancelled``). """ if self._future.cancelled(): return 'cancelled' elif self._future.done(): return 'done' else: return 'pending' @property def future(self): """ The wrapped future """ return self._future def __repr__(self): return "<%s object at %s, uid=%s, state=%s>" % \ (self.__class__.__name__, id(self), self.uid, self.state) def __await__(self): return (yield from self._future.__await__())
[docs] def result(self): """ Get the future result. Returns ------- """ return self._future.result()
[docs] def exception(self): """ Get the future exception. Returns ------- """ return self._future.exception()
[docs] def set_result(self, result): """ Set the future result. Parameters ---------- result : object Returns ------- """ self._future.set_result(result)
[docs] def set_exception(self, exc): """ Set the future exception. Parameters ---------- exc : Exception Returns ------- """ self._future.set_exception(exc)
[docs] def done(self): """ Check whether the future is done. Returns ------- """ return self._future.done()
[docs] def cancelled(self): """ Check whether the future is cancelled. Returns ------- """ return self._future.cancelled()
[docs] def add_done_callback(self, fun): """ Add done callback. Parameters ---------- fun : callable Returns ------- """ self._future.add_done_callback(fun)
[docs] class EventLoop: """ The event loop encapsulates the asyncio (or equivalent) event loop, which will run in a separate thread. It provides helper functions to run things within the loop, in an executor, and to call functions after a period of time or every fixed amount of time. Parameters ---------- loop : asyncio loop, optional Asyncio event loop to use internally, defaults to new loop. """ def __init__(self, loop=None): uvloop.install() self._loop = loop or asyncio.new_event_loop() asyncio.set_event_loop(self._loop) # TODO Figure out the best way to set this self._executor = concurrent.futures.ThreadPoolExecutor(1) self._stop = asyncio.Event() self._futures = set() self._recurring_tasks = weakref.WeakSet()
[docs] def get_event_loop(self): """ Access the internal loop. Returns ------- asyncio loop """ return self._loop
[docs] def run_forever(self): """ Run event loop forever. Returns ------- """ async def main(): await self._stop.wait() return self._loop.run_until_complete(main())
[docs] def stop(self): """ Stop the event loop. Returns ------- """ try: if self._stop.is_set(): return self._stop.set() for task in list(self._recurring_tasks): if not task.done(): task.cancel() tasks = asyncio.all_tasks() pending = [task for task in tasks if not task.done()] for task in pending: task.cancel() while len(pending): self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) pending = [task for task in tasks if not task.done()] self._loop.stop() self._loop.close() self._executor.shutdown() self._executor.shutdown(wait=True) asyncio.set_event_loop(None) except RuntimeError: pass
def __del__(self): self.stop()
[docs] def run(self, coro, *args, **kwargs): """ Schedule a function in the event loop from synchronous code. The call can be waited or returned immediately. Parameters ---------- coro : callable Function to execute in the loop. args : tuple, optional Set of arguments for the function. kwargs : optional Set of keyword arguments for the function. Returns ------- Return value from call or concurrent.futures.Future, depending on whether it is waited or not. """ if self._stop.is_set(): return kwargs = kwargs or {} if not inspect.iscoroutine(coro) and not inspect.iscoroutinefunction(coro): coro = awaitify(coro) if not self._loop.is_running(): return self._loop.run_until_complete(coro(*args, **kwargs)) future = self._loop.create_task(coro(*args, **kwargs)) self._futures.add(future) future.add_done_callback(self._done_future_callback(future)) return future
[docs] def run_in_executor(self, callback, *args, **kwargs): """ Run function in a thread executor. Parameters ---------- callback : callable Function to execute. args : tuple, optional Set of arguments for the function. kwargs : optional Set of keyword arguments for the function. Returns ------- asyncio.Future """ if self._stop.is_set(): return callback = functools.partial(callback, *args, **kwargs) future = self._loop.run_in_executor(self._executor, callback) self._futures.add(future) future.add_done_callback(self._done_future_callback(future)) return future
[docs] def wrap_future(self, future): """ Wrap a concurrent.futures.Future to be compatible with asyncio. Parameters ---------- future : concurrent.futures.Future Returns ------- asyncio.Future """ return asyncio.wrap_future(future, loop=self._loop)
[docs] def timeout(self, coro, timeout, *args, **kwargs): """ Run function after a certain ``timeout`` in seconds. Parameters ---------- coro : callable Function to execute in the loop. timeout : float Time to wait before execution in seconds. args : tuple, optional Set of arguments for the function. kwargs : optional Set of keyword arguments for the function. Returns ------- concurrent.futures.Future """ kwargs = kwargs or {} async def _timeout(): await asyncio.sleep(timeout) await self.run(coro, *args, **kwargs) future = asyncio.run_coroutine_threadsafe(_timeout(), self._loop) self._recurring_tasks.add(future) return future
[docs] def interval(self, coro, interval, *args, **kwargs): """ Run function every ``interval`` in seconds, starting after ``interval`` seconds. Parameters ---------- coro : callable Function to execute in the loop. interval : float Time to wait between executions in seconds. args : tuple, optional Set of arguments for the function. kwargs : optional Set of keyword arguments for the function. Returns ------- concurrent.futures.Future """ kwargs = kwargs or {} async def _interval(): while not self._stop.is_set(): await asyncio.sleep(interval) await self.run(coro, *args, **kwargs) future = asyncio.run_coroutine_threadsafe(_interval(), self._loop) self._recurring_tasks.add(future) return future
[docs] def set_main_thread(self): """ Set loop thread as main thread. Returns ------- """ self._loop.call_soon_threadsafe(set_main_thread)
def _done_future_callback(self, future): def _done_future(*args): self._futures.remove(future) return _done_future
class AwaitableOnly: __slots__ = ['_coro'] def __init__(self, coro): object.__setattr__(self, '_coro', coro) def __repr__(self): return f'<AwaitableOnly "{self._coro.__qualname__}">' def __await__(self): return self._coro().__await__()
[docs] class async_property: def __init__(self, func, field_name=None): self._func = func self.field_name = field_name or func.__name__ functools.update_wrapper(self, func) def __set_name__(self, cls, name): self.field_name = name def __get__(self, instance, cls): if instance is None: return self return self.awaitable_only(instance)
[docs] def get_loader(self, instance): @functools.wraps(self._func) async def get_value(): return await self._func(instance) return get_value
[docs] def awaitable_only(self, instance): return AwaitableOnly(self.get_loader(instance))
[docs] def gather(tasks): """ Wait for the termination of a group of tasks concurrently. Parameters ---------- tasks : list Set of tasks to wait. Returns ------- list Set of results from the task list. """ if not isinstance(tasks, list): return tasks else: return asyncio.gather(*tasks)