Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/document/index_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/document/index_queue.py')
-rw-r--r--sugar_network/document/index_queue.py298
1 files changed, 298 insertions, 0 deletions
diff --git a/sugar_network/document/index_queue.py b/sugar_network/document/index_queue.py
new file mode 100644
index 0000000..b77a9cf
--- /dev/null
+++ b/sugar_network/document/index_queue.py
@@ -0,0 +1,298 @@
+# Copyright (C) 2012, Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import time
+import thread
+import logging
+import threading
+import collections
+
+from active_document import env
+from active_document.index import IndexWriter
+from active_toolkit import util, coroutine
+
+
+errnum = 0
+
+_queue = None
+_write_thread = None
+
+_logger = logging.getLogger('active_document.index_queue')
+
+
+def start(root, document_classes):
+ """Initialize the queue.
+
+ Function will start index writing thread.
+
+ :param document_classes:
+ `active_document.Document` classes that queue should serve
+ index writes for
+
+ """
+ global _queue, _write_thread
+
+ if _queue is not None:
+ return
+
+ _queue = _Queue()
+ _write_thread = _WriteThread(root, document_classes)
+ _write_thread.start()
+
+
+def put(document, op, *args):
+ """Put new index change operation to the queue.
+
+ Function migh be stuck in green wait if queue is full.
+
+ :param document:
+ document index name
+ :param op:
+ arbitrary function
+ :param args:
+ optional arguments to pass to `op`
+ :returns:
+ commit seqno that will be used for the next `put()` call
+
+ """
+ return _queue.push(False, document, op, *args)
+
+
+def commit(document):
+ """Flush all pending changes.
+
+ :param document:
+ document index name
+
+ """
+ _queue.push(True, document)
+
+
+def commit_and_wait(document):
+ """Flush all pending changes.
+
+ The function is different to `commit()` because it waits for
+ commit finishing.
+
+ :param document:
+ document index name
+
+ """
+ _queue.push(True, document)
+ _queue.wait()
+
+
+def commit_seqno(document):
+ """Last commit seqno."""
+ return _queue.commit_seqno(document)
+
+
+def stop():
+ """Flush all pending changes."""
+ global _queue
+ if _queue is None:
+ return
+ put(None, None)
+ _write_thread.join()
+ _queue = None
+
+
+class _WriteThread(threading.Thread):
+
+ def __init__(self, root, document_classes):
+ threading.Thread.__init__(self)
+ self._root = root
+ self.daemon = True
+ self._document_classes = document_classes
+ self._writers = {}
+
+ def run(self):
+ _logger.debug('Start processing queue')
+ try:
+ self._run()
+ except _EmptyQueue:
+ self._close()
+ except Exception:
+ global errnum
+ errnum += 1
+ util.exception(
+ 'Write queue died, will abort the whole application')
+ thread.interrupt_main()
+ finally:
+ _logger.debug('Stop processing queue')
+
+ def _run(self):
+ for cls in self._document_classes:
+ _logger.info('Open %r index', cls.metadata.name)
+ self._writers[cls.metadata.name] = \
+ IndexWriter(self._root, cls.metadata)
+
+ closing = False
+
+ while True:
+ document, op, args, to_commit = _queue.pop_start(not closing)
+ if document is None:
+ _queue.pop_done(document, to_commit)
+ closing = True
+ continue
+
+ writer = self._writers[document]
+
+ if op is not None:
+ _logger.debug('Start processing %r(%r) for %r index',
+ op, args, document)
+ try:
+ op(writer, *args)
+ except Exception:
+ global errnum
+ errnum += 1
+ util.exception(_logger,
+ 'Cannot process %r(%r) for %r index',
+ op, args, document)
+ if to_commit:
+ writer.commit()
+
+ _queue.pop_done(document, to_commit)
+
+ def _close(self):
+ while self._writers:
+ name, writer = self._writers.popitem()
+ _logger.info('Closing %r index', name)
+ try:
+ writer.close()
+ except Exception:
+ global errnum
+ errnum += 1
+ util.exception(_logger, 'Fail to close %r index', name)
+
+
+class _Queue(object):
+
+ class _Seqno(object):
+
+ def __init__(self):
+ self.pending_seqno = 1
+ self.commit_seqno = 0
+ self.changes = 0
+ self.endtime = time.time() + env.index_flush_timeout.value
+
+ def __init__(self):
+ self._queue = collections.deque()
+ self._mutex = threading.Lock()
+ self._push_cond = threading.Condition(self._mutex)
+ self._done_cond = threading.Condition(self._mutex)
+ self._done_async = coroutine.AsyncEvent()
+ self._endtime = time.time() + env.index_flush_timeout.value
+ self._seqno = {}
+
+ def push(self, to_commit, document, op=None, *args):
+ self._mutex.acquire()
+ try:
+ while len(self._queue) >= env.index_write_queue.value:
+ self._mutex.release()
+ try:
+ # This is potential race but we need it to avoid hanging
+ # in condition wait to let other coroutines work.
+ # The race might be avoided by using big enough
+ # `active_document.index_write_queue.value`
+ _logger.debug('Postpone %r for %r index', op, document)
+ self._done_async.wait()
+ finally:
+ self._mutex.acquire()
+ return self._push(to_commit, document, op, args)
+ finally:
+ self._mutex.release()
+
+ def pop_start(self, blocking=True):
+ self._mutex.acquire()
+ try:
+ while True:
+ remaining = None
+ if env.index_flush_timeout.value:
+ ts = time.time()
+ remaining = self._endtime - ts
+ if remaining <= 0.0:
+ for document, seqno in self._seqno.items():
+ if seqno.endtime <= ts:
+ self._push(True, document, None, None)
+ remaining = env.index_flush_timeout.value
+ self._endtime = ts + remaining
+ if not self._queue:
+ if not blocking:
+ raise _EmptyQueue()
+ self._push_cond.wait(remaining)
+ if self._queue:
+ return self._queue[0]
+ finally:
+ self._mutex.release()
+
+ def pop_done(self, document, to_commit):
+ self._mutex.acquire()
+ try:
+ self._queue.popleft()
+ if to_commit:
+ self._seqno[document].commit_seqno += 1
+ self._done_cond.notify()
+ finally:
+ self._mutex.release()
+ self._done_async.send()
+
+ def wait(self):
+ self._mutex.acquire()
+ try:
+ while self._queue:
+ self._done_cond.wait()
+ finally:
+ self._mutex.release()
+
+ def commit_seqno(self, document):
+ self._mutex.acquire()
+ try:
+ seqno = self._seqno.get(document)
+ return 0 if seqno is None else seqno.commit_seqno
+ finally:
+ self._mutex.release()
+
+ def _push(self, to_commit, document, op, args):
+ seqno = self._seqno.get(document)
+ if seqno is None:
+ seqno = self._seqno[document] = _Queue._Seqno()
+ if op is not None:
+ seqno.changes += 1
+
+ if env.index_flush_threshold.value:
+ if seqno.changes >= env.index_flush_threshold.value:
+ to_commit = True
+ if env.index_flush_timeout.value:
+ ts = time.time()
+ if seqno.endtime <= ts:
+ to_commit = True
+ seqno.endtime = ts + env.index_flush_timeout.value
+
+ if to_commit:
+ if seqno.changes:
+ seqno.pending_seqno += 1
+ seqno.changes = 0
+ else:
+ to_commit = False
+
+ self._queue.append((document, op, args, to_commit))
+ self._push_cond.notify()
+
+ return seqno.pending_seqno
+
+
+class _EmptyQueue(Exception):
+ pass