#!/usr/bin/env python """Run all tests in the current directory. You can either call it without arguments to run all tests or name specific ones to run: ./runalltests.py test_massops.py """ import doctest import errno import logging from optparse import OptionParser import os import os.path import shutil import signal import subprocess import sys import tempfile import time import unittest import dbus import dbus.mainloop.glib import gobject logging.basicConfig(level=logging.WARN, format='%(asctime)-15s %(name)s %(levelname)s: %(message)s', stream=sys.stderr) DOCTESTS = [ 'native_api_v1.txt', 'sugar_api_v2.txt', ] DOCTEST_OPTIONS = (doctest.ELLIPSIS | doctest.REPORT_ONLY_FIRST_FAILURE | doctest.REPORT_UDIFF) DS_DBUS_SERVICE = 'org.laptop.sugar.DataStore' DS_DBUS_INTERFACE = 'org.laptop.sugar.DataStore' DS_DBUS_PATH = '/org/laptop/sugar/DataStore' ENVIRONMENT_WHITELIST = [ 'MALLOC_CHECK_', 'MASSOPS_RUNS', 'GDATASTORE_LOGLEVEL', ] def write_service_file(service_dir, executable, bus_name): service_path = os.path.join(service_dir, bus_name + '.service') service_file = file(service_path, 'w') service_file.write(""" [D-BUS Service] Name = %s Exec = %s """.replace(' ', '') % (bus_name, executable)) service_file.close() def setup(): """Prepare for testing and return environment. Sets HOME and creates a new process group so we can clean up easily later. Sets up environment variables and imports whitelisted ones. """ environment = {} for name in ENVIRONMENT_WHITELIST: if name in os.environ: environment[name] = os.environ[name] environment['HOME'] = tempfile.mkdtemp(prefix='gdatastore-test') if 'PYTHONPATH' in os.environ: python_path = os.environ.get('PYTHONPATH').split(':') else: python_path = [] # Run tests on sources instead of on installed files. basedir = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), '..') python_path = [basedir] + python_path environment['PYTHONPATH'] = ':'.join(python_path) environment['PATH'] = os.path.join(basedir, 'bin')+':'+os.environ['PATH'] environment['XDG_DATA_DIRS'] = environment['HOME'] service_dir = os.path.join(environment['HOME'], 'dbus-1', 'services') os.makedirs(service_dir) executable = os.path.join(basedir, 'bin', 'gdatastore-service') write_service_file(service_dir, executable, 'org.laptop.sugar.DataStore') write_service_file(service_dir, executable, 'org.silbe.GDataStore') os.setpgid(0, 0) # prevent suicide in cleanup() signal.signal(signal.SIGTERM, signal.SIG_IGN) return environment def wait_children(): """Wait for all children to exit.""" try: while True: os.wait() except OSError, exception: if exception.errno in (errno.ECHILD, errno.ESRCH): # ECHILD is not documented in kill(2) and kill(2p), but used by # Linux to indicate no child processes remaining to be waited for return raise def cleanup(home, keep_files, dbus_pid): """Clean up test environment. Kills all children and removes home directory. """ if dbus_pid: os.kill(-dbus_pid, signal.SIGTERM) os.kill(0, signal.SIGTERM) wait_children() if not keep_files: shutil.rmtree(home) class TestSuiteWrapper(unittest.TestCase): """Wrap a test suite to clean up after it. This ensures each test module gets a clean data store instance. """ def __init__(self, suite): self._wrapped_suite = suite self._bus = dbus.SessionBus() self._loop = None unittest.TestCase.__init__(self) def runTest(self, result=None): self._wrapped_suite(result) def run(self, result=None): if result is None: result = self.defaultTestResult() result.startTest(self) try: try: self.setUp() except KeyboardInterrupt: raise except: result.addError(self, sys.exc_info()) return ok = False try: self.runTest(result) ok = True except self.failureException: result.addFailure(self, sys.exc_info()) except KeyboardInterrupt: raise except: result.addError(self, sys.exc_info()) try: self.tearDown() except KeyboardInterrupt: raise except: result.addError(self, sys.exc_info()) ok = False if ok: result.addSuccess(self) finally: result.stopTest(self) def shortDescription(self): doc = self._wrapped_suite.__doc__ return doc and doc.split('\n')[0].strip() or None def tearDown(self): self._kill_data_store() self._clean_data_store() def _kill_data_store(self): pgrep = subprocess.Popen(['pgrep', '-g', os.environ['DBUS_PID'], '-f', 'gdatastore-service'], close_fds=True, stdout=subprocess.PIPE) stdout, stderr_ = pgrep.communicate() pids = stdout.strip().split('\n') if len(pids) != 1 or not pids[0]: raise ValueError("Can't find (a single) data store process " "(pgrep output %r)" % (stdout, )) pid = int(pids[0]) self._loop = gobject.MainLoop() self._bus.watch_name_owner(DS_DBUS_SERVICE, self._service_changed_cb) os.kill(pid, signal.SIGTERM) # wait for D-Bus to acknowledge the service stopped self._loop.run() # wait for process to finish try: os.waitpid(pid, 0) except OSError, exception: if exception.errno != errno.ECHILD: raise def _service_changed_cb(self, new_owner): if not new_owner: self._loop.quit() def _clean_data_store(self): data_store_dir = os.path.expanduser('~/.gdatastore') for name in os.listdir(data_store_dir): if name == 'logs': continue path = os.path.join(data_store_dir, name) if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) class TimedTestResult(unittest._TextTestResult): """Store and display test results and test runtime. Only displays actual tests, not test suite wrappers.""" # Depending on a private class is bad style, but the only alternative is # copying it verbatim. # pylint: disable-msg=W0212 def __init__(self, stream, descriptions, verbosity): unittest._TextTestResult.__init__(self, stream, descriptions, verbosity) self.start_times = {} self.run_times = {} def startTest(self, test): self.start_times[test] = time.time() unittest.TestResult.startTest(self, test) if not self.showAll: return description = self.getDescription(test) if isinstance(test, TestSuiteWrapper): self.stream.write('Test Suite: %s\n' % (description, )) else: self.stream.write(' %s ... ' % (description, )) def stopTest(self, test): if test in self.start_times and test not in self.run_times: self.run_times[test] = time.time() - self.start_times[test] unittest._TextTestResult.stopTest(self, test) def addSuccess(self, test): if test in self.start_times and test not in self.run_times: self.run_times[test] = time.time() - self.start_times[test] run_time = self.run_times.get(test, -1) unittest.TestResult.addSuccess(self, test) if isinstance(test, TestSuiteWrapper): return if self.showAll: self.stream.writeln('ok (%.3fs)' % (run_time, )) elif self.dots: self.stream.write('.') class TimedTestRunner(unittest.TextTestRunner): """Run tests, displaying test result and runtime in textual form.""" def _makeResult(self): return TimedTestResult(self.stream, self.descriptions, self.verbosity) def test_suite(tests=None): suite = unittest.TestSuite() if not tests: test_dir = os.path.dirname(__file__) tests = DOCTESTS tests += [name for name in os.listdir(test_dir) if name.startswith('test') and name.endswith('.py')] for test in tests: if test.endswith('.txt'): doc_suite = doctest.DocFileSuite(test, optionflags=DOCTEST_OPTIONS) doc_suite.__doc__ = test suite.addTest(TestSuiteWrapper(doc_suite)) elif test.endswith('.py'): m = __import__(test[:-3]) if hasattr(m, 'suite'): suite.addTest(TestSuiteWrapper(m.suite())) return suite def run_tests(tests): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) runner = TimedTestRunner(verbosity=2) suite = test_suite(tests) result = runner.run(suite) if result.wasSuccessful(): return 0 else: return 10 def _start_dbus(environment): pipe = subprocess.Popen(['dbus-launch'], stdout=subprocess.PIPE, close_fds=True, env=environment, cwd=environment['HOME']) stdout, stderr_ = pipe.communicate() pid = None address = None for line in stdout.strip().split('\n'): key, value = line.split('=', 1) if key == 'DBUS_SESSION_BUS_ADDRESS': address = value elif key == 'DBUS_SESSION_BUS_PID': pid = int(value) else: raise ValueError('Cannot parse dbus-launch output: %r' % (line, )) assert pid is not None assert address is not None return pid, address def _parse_options(): """Parse command line arguments.""" parser = OptionParser() parser.add_option('-k', '--keep', dest='keep', action='store_true', default=False, help='Keep temporary files') parser.add_option('', '--stage2', dest='stage2', action='store_true', default=False, help='For internal use only') return parser.parse_args() def main(my_name, arguments): options, tests = _parse_options() if not options.stage2: environment = setup() dbus_pid = None dbus_address = None try: dbus_pid, dbus_address = _start_dbus(environment) environment['DBUS_SESSION_BUS_ADDRESS'] = dbus_address environment['DBUS_PID'] = str(dbus_pid) pipe = subprocess.Popen([os.path.abspath(my_name), '--stage2']+arguments, cwd=environment['HOME'], env=environment) return pipe.wait() finally: cleanup(environment['HOME'], options.keep, dbus_pid) return run_tests(tests) if __name__ == '__main__': sys.exit(main(sys.argv[0], sys.argv[1:]))