Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/test/test_caching.py
diff options
context:
space:
mode:
Diffstat (limited to 'cherrypy/test/test_caching.py')
-rwxr-xr-xcherrypy/test/test_caching.py329
1 files changed, 329 insertions, 0 deletions
diff --git a/cherrypy/test/test_caching.py b/cherrypy/test/test_caching.py
new file mode 100755
index 0000000..720a933
--- /dev/null
+++ b/cherrypy/test/test_caching.py
@@ -0,0 +1,329 @@
+import datetime
+import gzip
+from itertools import count
+import os
+curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
+import sys
+import threading
+import time
+import urllib
+
+import cherrypy
+from cherrypy._cpcompat import next, ntob, quote, xrange
+from cherrypy.lib import httputil
+
+gif_bytes = ntob('GIF89a\x01\x00\x01\x00\x82\x00\x01\x99"\x1e\x00\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ '\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x02\x03\x02\x08\t\x00;')
+
+
+
+from cherrypy.test import helper
+
+class CacheTest(helper.CPWebCase):
+
+ def setup_server():
+
+ class Root:
+
+ _cp_config = {'tools.caching.on': True}
+
+ def __init__(self):
+ self.counter = 0
+ self.control_counter = 0
+ self.longlock = threading.Lock()
+
+ def index(self):
+ self.counter += 1
+ msg = "visit #%s" % self.counter
+ return msg
+ index.exposed = True
+
+ def control(self):
+ self.control_counter += 1
+ return "visit #%s" % self.control_counter
+ control.exposed = True
+
+ def a_gif(self):
+ cherrypy.response.headers['Last-Modified'] = httputil.HTTPDate()
+ return gif_bytes
+ a_gif.exposed = True
+
+ def long_process(self, seconds='1'):
+ try:
+ self.longlock.acquire()
+ time.sleep(float(seconds))
+ finally:
+ self.longlock.release()
+ return 'success!'
+ long_process.exposed = True
+
+ def clear_cache(self, path):
+ cherrypy._cache.store[cherrypy.request.base + path].clear()
+ clear_cache.exposed = True
+
+ class VaryHeaderCachingServer(object):
+
+ _cp_config = {'tools.caching.on': True,
+ 'tools.response_headers.on': True,
+ 'tools.response_headers.headers': [('Vary', 'Our-Varying-Header')],
+ }
+
+ def __init__(self):
+ self.counter = count(1)
+
+ def index(self):
+ return "visit #%s" % next(self.counter)
+ index.exposed = True
+
+ class UnCached(object):
+ _cp_config = {'tools.expires.on': True,
+ 'tools.expires.secs': 60,
+ 'tools.staticdir.on': True,
+ 'tools.staticdir.dir': 'static',
+ 'tools.staticdir.root': curdir,
+ }
+
+ def force(self):
+ cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
+ self._cp_config['tools.expires.force'] = True
+ self._cp_config['tools.expires.secs'] = 0
+ return "being forceful"
+ force.exposed = True
+ force._cp_config = {'tools.expires.secs': 0}
+
+ def dynamic(self):
+ cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
+ cherrypy.response.headers['Cache-Control'] = 'private'
+ return "D-d-d-dynamic!"
+ dynamic.exposed = True
+
+ def cacheable(self):
+ cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
+ return "Hi, I'm cacheable."
+ cacheable.exposed = True
+
+ def specific(self):
+ cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable'
+ return "I am being specific"
+ specific.exposed = True
+ specific._cp_config = {'tools.expires.secs': 86400}
+
+ class Foo(object):pass
+
+ def wrongtype(self):
+ cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable'
+ return "Woops"
+ wrongtype.exposed = True
+ wrongtype._cp_config = {'tools.expires.secs': Foo()}
+
+ cherrypy.tree.mount(Root())
+ cherrypy.tree.mount(UnCached(), "/expires")
+ cherrypy.tree.mount(VaryHeaderCachingServer(), "/varying_headers")
+ cherrypy.config.update({'tools.gzip.on': True})
+ setup_server = staticmethod(setup_server)
+
+ def testCaching(self):
+ elapsed = 0.0
+ for trial in range(10):
+ self.getPage("/")
+ # The response should be the same every time,
+ # except for the Age response header.
+ self.assertBody('visit #1')
+ if trial != 0:
+ age = int(self.assertHeader("Age"))
+ self.assert_(age >= elapsed)
+ elapsed = age
+
+ # POST, PUT, DELETE should not be cached.
+ self.getPage("/", method="POST")
+ self.assertBody('visit #2')
+ # Because gzip is turned on, the Vary header should always Vary for content-encoding
+ self.assertHeader('Vary', 'Accept-Encoding')
+ # The previous request should have invalidated the cache,
+ # so this request will recalc the response.
+ self.getPage("/", method="GET")
+ self.assertBody('visit #3')
+ # ...but this request should get the cached copy.
+ self.getPage("/", method="GET")
+ self.assertBody('visit #3')
+ self.getPage("/", method="DELETE")
+ self.assertBody('visit #4')
+
+ # The previous request should have invalidated the cache,
+ # so this request will recalc the response.
+ self.getPage("/", method="GET", headers=[('Accept-Encoding', 'gzip')])
+ self.assertHeader('Content-Encoding', 'gzip')
+ self.assertHeader('Vary')
+ self.assertEqual(cherrypy.lib.encoding.decompress(self.body), ntob("visit #5"))
+
+ # Now check that a second request gets the gzip header and gzipped body
+ # This also tests a bug in 3.0 to 3.0.2 whereby the cached, gzipped
+ # response body was being gzipped a second time.
+ self.getPage("/", method="GET", headers=[('Accept-Encoding', 'gzip')])
+ self.assertHeader('Content-Encoding', 'gzip')
+ self.assertEqual(cherrypy.lib.encoding.decompress(self.body), ntob("visit #5"))
+
+ # Now check that a third request that doesn't accept gzip
+ # skips the cache (because the 'Vary' header denies it).
+ self.getPage("/", method="GET")
+ self.assertNoHeader('Content-Encoding')
+ self.assertBody('visit #6')
+
+ def testVaryHeader(self):
+ self.getPage("/varying_headers/")
+ self.assertStatus("200 OK")
+ self.assertHeaderItemValue('Vary', 'Our-Varying-Header')
+ self.assertBody('visit #1')
+
+ # Now check that different 'Vary'-fields don't evict each other.
+ # This test creates 2 requests with different 'Our-Varying-Header'
+ # and then tests if the first one still exists.
+ self.getPage("/varying_headers/", headers=[('Our-Varying-Header', 'request 2')])
+ self.assertStatus("200 OK")
+ self.assertBody('visit #2')
+
+ self.getPage("/varying_headers/", headers=[('Our-Varying-Header', 'request 2')])
+ self.assertStatus("200 OK")
+ self.assertBody('visit #2')
+
+ self.getPage("/varying_headers/")
+ self.assertStatus("200 OK")
+ self.assertBody('visit #1')
+
+ def testExpiresTool(self):
+ # test setting an expires header
+ self.getPage("/expires/specific")
+ self.assertStatus("200 OK")
+ self.assertHeader("Expires")
+
+ # test exceptions for bad time values
+ self.getPage("/expires/wrongtype")
+ self.assertStatus(500)
+ self.assertInBody("TypeError")
+
+ # static content should not have "cache prevention" headers
+ self.getPage("/expires/index.html")
+ self.assertStatus("200 OK")
+ self.assertNoHeader("Pragma")
+ self.assertNoHeader("Cache-Control")
+ self.assertHeader("Expires")
+
+ # dynamic content that sets indicators should not have
+ # "cache prevention" headers
+ self.getPage("/expires/cacheable")
+ self.assertStatus("200 OK")
+ self.assertNoHeader("Pragma")
+ self.assertNoHeader("Cache-Control")
+ self.assertHeader("Expires")
+
+ self.getPage('/expires/dynamic')
+ self.assertBody("D-d-d-dynamic!")
+ # the Cache-Control header should be untouched
+ self.assertHeader("Cache-Control", "private")
+ self.assertHeader("Expires")
+
+ # configure the tool to ignore indicators and replace existing headers
+ self.getPage("/expires/force")
+ self.assertStatus("200 OK")
+ # This also gives us a chance to test 0 expiry with no other headers
+ self.assertHeader("Pragma", "no-cache")
+ if cherrypy.server.protocol_version == "HTTP/1.1":
+ self.assertHeader("Cache-Control", "no-cache, must-revalidate")
+ self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
+
+ # static content should now have "cache prevention" headers
+ self.getPage("/expires/index.html")
+ self.assertStatus("200 OK")
+ self.assertHeader("Pragma", "no-cache")
+ if cherrypy.server.protocol_version == "HTTP/1.1":
+ self.assertHeader("Cache-Control", "no-cache, must-revalidate")
+ self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
+
+ # the cacheable handler should now have "cache prevention" headers
+ self.getPage("/expires/cacheable")
+ self.assertStatus("200 OK")
+ self.assertHeader("Pragma", "no-cache")
+ if cherrypy.server.protocol_version == "HTTP/1.1":
+ self.assertHeader("Cache-Control", "no-cache, must-revalidate")
+ self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
+
+ self.getPage('/expires/dynamic')
+ self.assertBody("D-d-d-dynamic!")
+ # dynamic sets Cache-Control to private but it should be
+ # overwritten here ...
+ self.assertHeader("Pragma", "no-cache")
+ if cherrypy.server.protocol_version == "HTTP/1.1":
+ self.assertHeader("Cache-Control", "no-cache, must-revalidate")
+ self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
+
+ def testLastModified(self):
+ self.getPage("/a.gif")
+ self.assertStatus(200)
+ self.assertBody(gif_bytes)
+ lm1 = self.assertHeader("Last-Modified")
+
+ # this request should get the cached copy.
+ self.getPage("/a.gif")
+ self.assertStatus(200)
+ self.assertBody(gif_bytes)
+ self.assertHeader("Age")
+ lm2 = self.assertHeader("Last-Modified")
+ self.assertEqual(lm1, lm2)
+
+ # this request should match the cached copy, but raise 304.
+ self.getPage("/a.gif", [('If-Modified-Since', lm1)])
+ self.assertStatus(304)
+ self.assertNoHeader("Last-Modified")
+ if not getattr(cherrypy.server, "using_apache", False):
+ self.assertHeader("Age")
+
+ def test_antistampede(self):
+ SECONDS = 4
+ # We MUST make an initial synchronous request in order to create the
+ # AntiStampedeCache object, and populate its selecting_headers,
+ # before the actual stampede.
+ self.getPage("/long_process?seconds=%d" % SECONDS)
+ self.assertBody('success!')
+ self.getPage("/clear_cache?path=" +
+ quote('/long_process?seconds=%d' % SECONDS, safe=''))
+ self.assertStatus(200)
+ sys.stdout.write("prepped... ")
+ sys.stdout.flush()
+
+ start = datetime.datetime.now()
+ def run():
+ self.getPage("/long_process?seconds=%d" % SECONDS)
+ # The response should be the same every time
+ self.assertBody('success!')
+ ts = [threading.Thread(target=run) for i in xrange(100)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
+ self.assertEqualDates(start, datetime.datetime.now(),
+ # Allow a second for our thread/TCP overhead etc.
+ seconds=SECONDS + 1.1)
+
+ def test_cache_control(self):
+ self.getPage("/control")
+ self.assertBody('visit #1')
+ self.getPage("/control")
+ self.assertBody('visit #1')
+
+ self.getPage("/control", headers=[('Cache-Control', 'no-cache')])
+ self.assertBody('visit #2')
+ self.getPage("/control")
+ self.assertBody('visit #2')
+
+ self.getPage("/control", headers=[('Pragma', 'no-cache')])
+ self.assertBody('visit #3')
+ self.getPage("/control")
+ self.assertBody('visit #3')
+
+ time.sleep(1)
+ self.getPage("/control", headers=[('Cache-Control', 'max-age=0')])
+ self.assertBody('visit #4')
+ self.getPage("/control")
+ self.assertBody('visit #4')
+