Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/test/test_states.py
diff options
context:
space:
mode:
Diffstat (limited to 'cherrypy/test/test_states.py')
-rwxr-xr-xcherrypy/test/test_states.py436
1 files changed, 0 insertions, 436 deletions
diff --git a/cherrypy/test/test_states.py b/cherrypy/test/test_states.py
deleted file mode 100755
index 0f97337..0000000
--- a/cherrypy/test/test_states.py
+++ /dev/null
@@ -1,436 +0,0 @@
-from cherrypy._cpcompat import BadStatusLine, ntob
-import os
-import sys
-import threading
-import time
-
-import cherrypy
-engine = cherrypy.engine
-thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
-
-
-class Dependency:
-
- def __init__(self, bus):
- self.bus = bus
- self.running = False
- self.startcount = 0
- self.gracecount = 0
- self.threads = {}
-
- def subscribe(self):
- self.bus.subscribe('start', self.start)
- self.bus.subscribe('stop', self.stop)
- self.bus.subscribe('graceful', self.graceful)
- self.bus.subscribe('start_thread', self.startthread)
- self.bus.subscribe('stop_thread', self.stopthread)
-
- def start(self):
- self.running = True
- self.startcount += 1
-
- def stop(self):
- self.running = False
-
- def graceful(self):
- self.gracecount += 1
-
- def startthread(self, thread_id):
- self.threads[thread_id] = None
-
- def stopthread(self, thread_id):
- del self.threads[thread_id]
-
-db_connection = Dependency(engine)
-
-def setup_server():
- class Root:
- def index(self):
- return "Hello World"
- index.exposed = True
-
- def ctrlc(self):
- raise KeyboardInterrupt()
- ctrlc.exposed = True
-
- def graceful(self):
- engine.graceful()
- return "app was (gracefully) restarted succesfully"
- graceful.exposed = True
-
- def block_explicit(self):
- while True:
- if cherrypy.response.timed_out:
- cherrypy.response.timed_out = False
- return "broken!"
- time.sleep(0.01)
- block_explicit.exposed = True
-
- def block_implicit(self):
- time.sleep(0.5)
- return "response.timeout = %s" % cherrypy.response.timeout
- block_implicit.exposed = True
-
- cherrypy.tree.mount(Root())
- cherrypy.config.update({
- 'environment': 'test_suite',
- 'engine.deadlock_poll_freq': 0.1,
- })
-
- db_connection.subscribe()
-
-
-
-# ------------ Enough helpers. Time for real live test cases. ------------ #
-
-
-from cherrypy.test import helper
-
-class ServerStateTests(helper.CPWebCase):
- setup_server = staticmethod(setup_server)
-
- def setUp(self):
- cherrypy.server.socket_timeout = 0.1
-
- def test_0_NormalStateFlow(self):
- engine.stop()
- # Our db_connection should not be running
- self.assertEqual(db_connection.running, False)
- self.assertEqual(db_connection.startcount, 1)
- self.assertEqual(len(db_connection.threads), 0)
-
- # Test server start
- engine.start()
- self.assertEqual(engine.state, engine.states.STARTED)
-
- host = cherrypy.server.socket_host
- port = cherrypy.server.socket_port
- self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port)
-
- # The db_connection should be running now
- self.assertEqual(db_connection.running, True)
- self.assertEqual(db_connection.startcount, 2)
- self.assertEqual(len(db_connection.threads), 0)
-
- self.getPage("/")
- self.assertBody("Hello World")
- self.assertEqual(len(db_connection.threads), 1)
-
- # Test engine stop. This will also stop the HTTP server.
- engine.stop()
- self.assertEqual(engine.state, engine.states.STOPPED)
-
- # Verify that our custom stop function was called
- self.assertEqual(db_connection.running, False)
- self.assertEqual(len(db_connection.threads), 0)
-
- # Block the main thread now and verify that exit() works.
- def exittest():
- self.getPage("/")
- self.assertBody("Hello World")
- engine.exit()
- cherrypy.server.start()
- engine.start_with_callback(exittest)
- engine.block()
- self.assertEqual(engine.state, engine.states.EXITING)
-
- def test_1_Restart(self):
- cherrypy.server.start()
- engine.start()
-
- # The db_connection should be running now
- self.assertEqual(db_connection.running, True)
- grace = db_connection.gracecount
-
- self.getPage("/")
- self.assertBody("Hello World")
- self.assertEqual(len(db_connection.threads), 1)
-
- # Test server restart from this thread
- engine.graceful()
- self.assertEqual(engine.state, engine.states.STARTED)
- self.getPage("/")
- self.assertBody("Hello World")
- self.assertEqual(db_connection.running, True)
- self.assertEqual(db_connection.gracecount, grace + 1)
- self.assertEqual(len(db_connection.threads), 1)
-
- # Test server restart from inside a page handler
- self.getPage("/graceful")
- self.assertEqual(engine.state, engine.states.STARTED)
- self.assertBody("app was (gracefully) restarted succesfully")
- self.assertEqual(db_connection.running, True)
- self.assertEqual(db_connection.gracecount, grace + 2)
- # Since we are requesting synchronously, is only one thread used?
- # Note that the "/graceful" request has been flushed.
- self.assertEqual(len(db_connection.threads), 0)
-
- engine.stop()
- self.assertEqual(engine.state, engine.states.STOPPED)
- self.assertEqual(db_connection.running, False)
- self.assertEqual(len(db_connection.threads), 0)
-
- def test_2_KeyboardInterrupt(self):
- # Raise a keyboard interrupt in the HTTP server's main thread.
- # We must start the server in this, the main thread
- engine.start()
- cherrypy.server.start()
-
- self.persistent = True
- try:
- # Make the first request and assert there's no "Connection: close".
- self.getPage("/")
- self.assertStatus('200 OK')
- self.assertBody("Hello World")
- self.assertNoHeader("Connection")
-
- cherrypy.server.httpserver.interrupt = KeyboardInterrupt
- engine.block()
-
- self.assertEqual(db_connection.running, False)
- self.assertEqual(len(db_connection.threads), 0)
- self.assertEqual(engine.state, engine.states.EXITING)
- finally:
- self.persistent = False
-
- # Raise a keyboard interrupt in a page handler; on multithreaded
- # servers, this should occur in one of the worker threads.
- # This should raise a BadStatusLine error, since the worker
- # thread will just die without writing a response.
- engine.start()
- cherrypy.server.start()
-
- try:
- self.getPage("/ctrlc")
- except BadStatusLine:
- pass
- else:
- print(self.body)
- self.fail("AssertionError: BadStatusLine not raised")
-
- engine.block()
- self.assertEqual(db_connection.running, False)
- self.assertEqual(len(db_connection.threads), 0)
-
- def test_3_Deadlocks(self):
- cherrypy.config.update({'response.timeout': 0.2})
-
- engine.start()
- cherrypy.server.start()
- try:
- self.assertNotEqual(engine.timeout_monitor.thread, None)
-
- # Request a "normal" page.
- self.assertEqual(engine.timeout_monitor.servings, [])
- self.getPage("/")
- self.assertBody("Hello World")
- # request.close is called async.
- while engine.timeout_monitor.servings:
- sys.stdout.write(".")
- time.sleep(0.01)
-
- # Request a page that explicitly checks itself for deadlock.
- # The deadlock_timeout should be 2 secs.
- self.getPage("/block_explicit")
- self.assertBody("broken!")
-
- # Request a page that implicitly breaks deadlock.
- # If we deadlock, we want to touch as little code as possible,
- # so we won't even call handle_error, just bail ASAP.
- self.getPage("/block_implicit")
- self.assertStatus(500)
- self.assertInBody("raise cherrypy.TimeoutError()")
- finally:
- engine.exit()
-
- def test_4_Autoreload(self):
- # Start the demo script in a new process
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
- p.write_conf(
- extra='test_case_name: "test_4_Autoreload"')
- p.start(imports='cherrypy.test._test_states_demo')
- try:
- self.getPage("/start")
- start = float(self.body)
-
- # Give the autoreloader time to cache the file time.
- time.sleep(2)
-
- # Touch the file
- os.utime(os.path.join(thisdir, "_test_states_demo.py"), None)
-
- # Give the autoreloader time to re-exec the process
- time.sleep(2)
- host = cherrypy.server.socket_host
- port = cherrypy.server.socket_port
- cherrypy._cpserver.wait_for_occupied_port(host, port)
-
- self.getPage("/start")
- self.assert_(float(self.body) > start)
- finally:
- # Shut down the spawned process
- self.getPage("/exit")
- p.join()
-
- def test_5_Start_Error(self):
- # If a process errors during start, it should stop the engine
- # and exit with a non-zero exit code.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
- wait=True)
- p.write_conf(
- extra="""starterror: True
-test_case_name: "test_5_Start_Error"
-"""
- )
- p.start(imports='cherrypy.test._test_states_demo')
- if p.exit_code == 0:
- self.fail("Process failed to return nonzero exit code.")
-
-
-class PluginTests(helper.CPWebCase):
- def test_daemonize(self):
- if os.name not in ['posix']:
- return self.skip("skipped (not on posix) ")
- self.HOST = '127.0.0.1'
- self.PORT = 8081
- # Spawn the process and wait, when this returns, the original process
- # is finished. If it daemonized properly, we should still be able
- # to access pages.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
- wait=True, daemonize=True,
- socket_host='127.0.0.1',
- socket_port=8081)
- p.write_conf(
- extra='test_case_name: "test_daemonize"')
- p.start(imports='cherrypy.test._test_states_demo')
- try:
- # Just get the pid of the daemonization process.
- self.getPage("/pid")
- self.assertStatus(200)
- page_pid = int(self.body)
- self.assertEqual(page_pid, p.get_pid())
- finally:
- # Shut down the spawned process
- self.getPage("/exit")
- p.join()
-
- # Wait until here to test the exit code because we want to ensure
- # that we wait for the daemon to finish running before we fail.
- if p.exit_code != 0:
- self.fail("Daemonized parent process failed to exit cleanly.")
-
-
-class SignalHandlingTests(helper.CPWebCase):
- def test_SIGHUP_tty(self):
- # When not daemonized, SIGHUP should shut down the server.
- try:
- from signal import SIGHUP
- except ImportError:
- return self.skip("skipped (no SIGHUP) ")
-
- # Spawn the process.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
- p.write_conf(
- extra='test_case_name: "test_SIGHUP_tty"')
- p.start(imports='cherrypy.test._test_states_demo')
- # Send a SIGHUP
- os.kill(p.get_pid(), SIGHUP)
- # This might hang if things aren't working right, but meh.
- p.join()
-
- def test_SIGHUP_daemonized(self):
- # When daemonized, SIGHUP should restart the server.
- try:
- from signal import SIGHUP
- except ImportError:
- return self.skip("skipped (no SIGHUP) ")
-
- if os.name not in ['posix']:
- return self.skip("skipped (not on posix) ")
-
- # Spawn the process and wait, when this returns, the original process
- # is finished. If it daemonized properly, we should still be able
- # to access pages.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
- wait=True, daemonize=True)
- p.write_conf(
- extra='test_case_name: "test_SIGHUP_daemonized"')
- p.start(imports='cherrypy.test._test_states_demo')
-
- pid = p.get_pid()
- try:
- # Send a SIGHUP
- os.kill(pid, SIGHUP)
- # Give the server some time to restart
- time.sleep(2)
- self.getPage("/pid")
- self.assertStatus(200)
- new_pid = int(self.body)
- self.assertNotEqual(new_pid, pid)
- finally:
- # Shut down the spawned process
- self.getPage("/exit")
- p.join()
-
- def test_SIGTERM(self):
- # SIGTERM should shut down the server whether daemonized or not.
- try:
- from signal import SIGTERM
- except ImportError:
- return self.skip("skipped (no SIGTERM) ")
-
- try:
- from os import kill
- except ImportError:
- return self.skip("skipped (no os.kill) ")
-
- # Spawn a normal, undaemonized process.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
- p.write_conf(
- extra='test_case_name: "test_SIGTERM"')
- p.start(imports='cherrypy.test._test_states_demo')
- # Send a SIGTERM
- os.kill(p.get_pid(), SIGTERM)
- # This might hang if things aren't working right, but meh.
- p.join()
-
- if os.name in ['posix']:
- # Spawn a daemonized process and test again.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
- wait=True, daemonize=True)
- p.write_conf(
- extra='test_case_name: "test_SIGTERM_2"')
- p.start(imports='cherrypy.test._test_states_demo')
- # Send a SIGTERM
- os.kill(p.get_pid(), SIGTERM)
- # This might hang if things aren't working right, but meh.
- p.join()
-
- def test_signal_handler_unsubscribe(self):
- try:
- from signal import SIGTERM
- except ImportError:
- return self.skip("skipped (no SIGTERM) ")
-
- try:
- from os import kill
- except ImportError:
- return self.skip("skipped (no os.kill) ")
-
- # Spawn a normal, undaemonized process.
- p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
- p.write_conf(
- extra="""unsubsig: True
-test_case_name: "test_signal_handler_unsubscribe"
-""")
- p.start(imports='cherrypy.test._test_states_demo')
- # Send a SIGTERM
- os.kill(p.get_pid(), SIGTERM)
- # This might hang if things aren't working right, but meh.
- p.join()
-
- # Assert the old handler ran.
- target_line = open(p.error_log, 'rb').readlines()[-10]
- if not ntob("I am an old SIGTERM handler.") in target_line:
- self.fail("Old SIGTERM handler did not run.\n%r" % target_line)
-