Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/toolkit/coroutine.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/toolkit/coroutine.py')
-rw-r--r--sugar_network/toolkit/coroutine.py109
1 files changed, 87 insertions, 22 deletions
diff --git a/sugar_network/toolkit/coroutine.py b/sugar_network/toolkit/coroutine.py
index 170f445..1913bda 100644
--- a/sugar_network/toolkit/coroutine.py
+++ b/sugar_network/toolkit/coroutine.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -23,6 +23,7 @@ import logging
import gevent
import gevent.pool
import gevent.hub
+from gevent.queue import Empty
from sugar_network.toolkit import enforce
@@ -36,27 +37,27 @@ sleep = gevent.sleep
#: Wait for the spawned events to finish.
joinall = gevent.joinall
+#: Access to greenlet-local storage
+this = None
+
gevent.hub.Hub.resolver_class = 'gevent.resolver_ares.Resolver'
-_group = gevent.pool.Group()
+_all_jobs = None
_logger = logging.getLogger('coroutine')
_wsgi_logger = logging.getLogger('wsgi')
def spawn(*args, **kwargs):
- return _group.spawn(*args, **kwargs)
+ return _all_jobs.spawn(*args, **kwargs)
def spawn_later(seconds, *args, **kwargs):
- job = _group.greenlet_class(*args, **kwargs)
- job.start_later(seconds)
- _group.add(job)
- return job
+ return _all_jobs.spawn_later(*args, **kwargs)
def shutdown():
- _group.kill()
- return _group.join()
+ _all_jobs.kill()
+ return _all_jobs.join()
def reset_resolver():
@@ -168,10 +169,6 @@ class ThreadResult(object):
return self._value
-class Empty(Exception):
- pass
-
-
class AsyncQueue(object):
def __init__(self):
@@ -216,30 +213,30 @@ class AsyncQueue(object):
self._queue.put(*args, **kwargs)
def _get(self):
- from Queue import Empty as empty
- try:
- return self._queue.get_nowait()
- except empty:
- raise Empty()
+ return self._queue.get_nowait()
class Pool(gevent.pool.Pool):
def spawn(self, *args, **kwargs):
- job = gevent.pool.Pool.spawn(self, *args, **kwargs)
- _group.add(job)
+ job = self.greenlet_class(*args, **kwargs)
+ job.local = _Local()
+ if self is not _all_jobs:
+ _all_jobs.add(job)
+ self.start(job)
return job
def spawn_later(self, seconds, *args, **kwargs):
job = self.greenlet_class(*args, **kwargs)
+ job.local = _Local()
+ if self is not _all_jobs:
+ _all_jobs.add(job)
job.start_later(seconds)
self.add(job)
- _group.add(job)
return job
# pylint: disable-msg=W0221
def kill(self, *args, **kwargs):
- from gevent.queue import Empty
try:
gevent.pool.Pool.kill(self, *args, **kwargs)
except Empty:
@@ -253,6 +250,71 @@ class Pool(gevent.pool.Pool):
self.kill()
+class Spooler(object):
+ """One-producer many-consumers events delivery.
+
+ The delivery process supports lossless events feeding with guaranty that
+ every consumer proccessed every event producer pushed.
+
+ """
+
+ def __init__(self):
+ self._value = None
+ self._waiters = 0
+ self._ready = Event()
+ self._notifying_done = Event()
+ self._notifying_done.set()
+
+ @property
+ def waiters(self):
+ return self._waiters
+
+ def wait(self):
+ self._notifying_done.wait()
+ self._waiters += 1
+ try:
+ self._ready.wait()
+ value = self._value
+ finally:
+ self._waiters -= 1
+ if self._waiters == 0:
+ self._ready.clear()
+ self._notifying_done.set()
+ return value
+
+ def notify_all(self, value=None):
+ while not self._notifying_done.is_set():
+ self._notifying_done.wait()
+ if not self._waiters:
+ return
+ self._notifying_done.clear()
+ self._value = value
+ self._ready.set()
+
+
+class _Local(object):
+
+ def __init__(self):
+ self.attrs = set()
+
+ if hasattr(gevent.getcurrent(), 'local'):
+ current = gevent.getcurrent().local
+ for attr in current.attrs:
+ self.attrs.add(attr)
+ setattr(self, attr, getattr(current, attr))
+
+
+class _LocalAccess(object):
+
+ def __getattr__(self, name):
+ return getattr(gevent.getcurrent().local, name)
+
+ def __setattr__(self, name, value):
+ local = gevent.getcurrent().local
+ local.attrs.add(name)
+ return setattr(local, name, value)
+
+
class _Child(object):
def __init__(self, pid):
@@ -317,4 +379,7 @@ def _print_exception(context, klass, value, tb):
_logger.error('\n'.join([error, context, tb_repr]))
+_all_jobs = Pool()
gevent.hub.get_hub().print_exception = _print_exception
+gevent.getcurrent().local = gevent.get_hub().local = _Local()
+this = _LocalAccess()