Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/toolkit/pipe.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/toolkit/pipe.py')
-rw-r--r--sugar_network/toolkit/pipe.py189
1 files changed, 0 insertions, 189 deletions
diff --git a/sugar_network/toolkit/pipe.py b/sugar_network/toolkit/pipe.py
deleted file mode 100644
index 7a53201..0000000
--- a/sugar_network/toolkit/pipe.py
+++ /dev/null
@@ -1,189 +0,0 @@
-# Copyright (C) 2010-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import sys
-import json
-import struct
-import signal
-import logging
-import threading
-from os.path import exists, dirname, basename
-
-from sugar_network import toolkit
-from sugar_network.toolkit import coroutine
-
-
-_logger = logging.getLogger('pipe')
-_pipe = None
-_trace = None
-
-
-def feedback(state, **event):
- if _pipe is None:
- return
- event['state'] = state
- event = json.dumps(event)
- os.write(_pipe, struct.pack('i', len(event)))
- os.write(_pipe, event)
-
-
-def trace(message, *args):
- global _trace
- if _trace is None:
- _trace = []
- if args:
- message = message % args
- _logger.debug(message)
- _trace.append(message)
-
-
-def fork(callback, log_path=None, session=None, **kwargs):
- fd_r, fd_w = os.pipe()
-
- pid = os.fork()
- if pid:
- os.close(fd_w)
- _logger.debug('Fork %s%r with %s pid', callback, kwargs, pid)
- return _Pipe(pid, fd_r)
-
- os.close(fd_r)
- global _pipe
- _pipe = fd_w
-
- def thread_func():
- environ = {}
- if log_path:
- environ['log_path'] = _setup_logging(log_path)
- feedback('fork', session=session, environ=environ)
- try:
- callback(**kwargs)
- except Exception, error:
- feedback('failure', error_type=type(error).__name__,
- error=str(error), environ={'trace': _trace})
- _logger.exception('%r(%r) failed', callback, kwargs)
-
- if session is None:
- session = {}
- # Avoid a mess with current thread coroutines
- thread = threading.Thread(target=thread_func)
- thread.start()
- thread.join()
-
- os.close(fd_w)
- sys.stdout.flush()
- sys.stderr.flush()
- # pylint: disable-msg=W0212
- os._exit(0)
-
-
-class _Pipe(object):
-
- def __init__(self, pid, fd):
- self._pid = pid
- self._fd = fd
- self._session = {}
- self._environ = {}
-
- def fileno(self):
- return self._fd
-
- def read(self):
- if self._fd is None:
- return None
-
- event = None
- failed = False
-
- event_length = os.read(self._fd, struct.calcsize('i'))
- if event_length:
- event_length = struct.unpack('i', event_length)[0]
- event = json.loads(os.read(self._fd, event_length))
- if 'session' in event:
- self._session.update(event.pop('session'))
- if 'environ' in event:
- self._environ.update(event.pop('environ'))
- failed = (event['state'] == 'failure')
-
- if event is None or failed:
- status = 0
- try:
- __, status = os.waitpid(self._pid, 0)
- except OSError:
- pass
- if event is None:
- failure = _decode_exit_failure(status)
- if failure:
- _logger.debug('Process %s failed: %s', self._pid, failure)
- event = {'state': 'failure', 'error': failure}
- failed = True
- else:
- _logger.debug('Process %s successfully exited', self._pid)
- event = {'state': 'exit'}
- os.close(self._fd)
- self._fd = None
-
- if failed:
- event.update(self._environ)
- event.update(self._session)
-
- return event
-
- def __iter__(self):
- try:
- while self._fd is not None:
- coroutine.select([self._fd], [], [])
- event = self.read()
- if event is None:
- break
- yield event
- finally:
- if self._fd is not None:
- _logger.debug('Kill %s process', self._pid)
- os.kill(self._pid, signal.SIGTERM)
- while self.read() is not None:
- pass
-
-
-def _decode_exit_failure(status):
- failure = None
- if os.WIFEXITED(status):
- status = os.WEXITSTATUS(status)
- if status:
- failure = 'Exited with status %s' % status
- elif os.WIFSIGNALED(status):
- signum = os.WTERMSIG(status)
- if signum not in (signal.SIGINT, signal.SIGKILL, signal.SIGTERM):
- failure = 'Terminated by signal %s' % signum
- else:
- signum = os.WTERMSIG(status)
- failure = 'Undefined status with signal %s' % signum
- return failure
-
-
-def _setup_logging(path):
- log_dir = dirname(path)
- if not exists(log_dir):
- os.makedirs(log_dir)
- path = toolkit.unique_filename(log_dir, basename(path) + '.log')
-
- logfile = file(path, 'a+')
- os.dup2(logfile.fileno(), sys.stdout.fileno())
- os.dup2(logfile.fileno(), sys.stderr.fileno())
- logfile.close()
-
- toolkit.init_logging()
-
- return path