diff options
Diffstat (limited to 'sugar_network/toolkit/coroutine.py')
-rw-r--r-- | sugar_network/toolkit/coroutine.py | 109 |
1 files changed, 87 insertions, 22 deletions
diff --git a/sugar_network/toolkit/coroutine.py b/sugar_network/toolkit/coroutine.py index 170f445..1913bda 100644 --- a/sugar_network/toolkit/coroutine.py +++ b/sugar_network/toolkit/coroutine.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,6 +23,7 @@ import logging import gevent import gevent.pool import gevent.hub +from gevent.queue import Empty from sugar_network.toolkit import enforce @@ -36,27 +37,27 @@ sleep = gevent.sleep #: Wait for the spawned events to finish. joinall = gevent.joinall +#: Access to greenlet-local storage +this = None + gevent.hub.Hub.resolver_class = 'gevent.resolver_ares.Resolver' -_group = gevent.pool.Group() +_all_jobs = None _logger = logging.getLogger('coroutine') _wsgi_logger = logging.getLogger('wsgi') def spawn(*args, **kwargs): - return _group.spawn(*args, **kwargs) + return _all_jobs.spawn(*args, **kwargs) def spawn_later(seconds, *args, **kwargs): - job = _group.greenlet_class(*args, **kwargs) - job.start_later(seconds) - _group.add(job) - return job + return _all_jobs.spawn_later(*args, **kwargs) def shutdown(): - _group.kill() - return _group.join() + _all_jobs.kill() + return _all_jobs.join() def reset_resolver(): @@ -168,10 +169,6 @@ class ThreadResult(object): return self._value -class Empty(Exception): - pass - - class AsyncQueue(object): def __init__(self): @@ -216,30 +213,30 @@ class AsyncQueue(object): self._queue.put(*args, **kwargs) def _get(self): - from Queue import Empty as empty - try: - return self._queue.get_nowait() - except empty: - raise Empty() + return self._queue.get_nowait() class Pool(gevent.pool.Pool): def spawn(self, *args, **kwargs): - job = gevent.pool.Pool.spawn(self, *args, **kwargs) - _group.add(job) + job = self.greenlet_class(*args, **kwargs) + job.local = _Local() + if self is not _all_jobs: + _all_jobs.add(job) + self.start(job) return job def spawn_later(self, seconds, *args, **kwargs): job = self.greenlet_class(*args, **kwargs) + job.local = _Local() + if self is not _all_jobs: + _all_jobs.add(job) job.start_later(seconds) self.add(job) - _group.add(job) return job # pylint: disable-msg=W0221 def kill(self, *args, **kwargs): - from gevent.queue import Empty try: gevent.pool.Pool.kill(self, *args, **kwargs) except Empty: @@ -253,6 +250,71 @@ class Pool(gevent.pool.Pool): self.kill() +class Spooler(object): + """One-producer many-consumers events delivery. + + The delivery process supports lossless events feeding with guaranty that + every consumer proccessed every event producer pushed. + + """ + + def __init__(self): + self._value = None + self._waiters = 0 + self._ready = Event() + self._notifying_done = Event() + self._notifying_done.set() + + @property + def waiters(self): + return self._waiters + + def wait(self): + self._notifying_done.wait() + self._waiters += 1 + try: + self._ready.wait() + value = self._value + finally: + self._waiters -= 1 + if self._waiters == 0: + self._ready.clear() + self._notifying_done.set() + return value + + def notify_all(self, value=None): + while not self._notifying_done.is_set(): + self._notifying_done.wait() + if not self._waiters: + return + self._notifying_done.clear() + self._value = value + self._ready.set() + + +class _Local(object): + + def __init__(self): + self.attrs = set() + + if hasattr(gevent.getcurrent(), 'local'): + current = gevent.getcurrent().local + for attr in current.attrs: + self.attrs.add(attr) + setattr(self, attr, getattr(current, attr)) + + +class _LocalAccess(object): + + def __getattr__(self, name): + return getattr(gevent.getcurrent().local, name) + + def __setattr__(self, name, value): + local = gevent.getcurrent().local + local.attrs.add(name) + return setattr(local, name, value) + + class _Child(object): def __init__(self, pid): @@ -317,4 +379,7 @@ def _print_exception(context, klass, value, tb): _logger.error('\n'.join([error, context, tb_repr])) +_all_jobs = Pool() gevent.hub.get_hub().print_exception = _print_exception +gevent.getcurrent().local = gevent.get_hub().local = _Local() +this = _LocalAccess() |