Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/test/test_caching.py
diff options
context:
space:
mode:
Diffstat (limited to 'cherrypy/test/test_caching.py')
-rwxr-xr-xcherrypy/test/test_caching.py329
1 files changed, 0 insertions, 329 deletions
diff --git a/cherrypy/test/test_caching.py b/cherrypy/test/test_caching.py
deleted file mode 100755
index 720a933..0000000
--- a/cherrypy/test/test_caching.py
+++ /dev/null
@@ -1,329 +0,0 @@
-import datetime
-import gzip
-from itertools import count
-import os
-curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
-import sys
-import threading
-import time
-import urllib
-
-import cherrypy
-from cherrypy._cpcompat import next, ntob, quote, xrange
-from cherrypy.lib import httputil
-
-gif_bytes = ntob('GIF89a\x01\x00\x01\x00\x82\x00\x01\x99"\x1e\x00\x00\x00\x00\x00'
- '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- '\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x02\x03\x02\x08\t\x00;')
-
-
-
-from cherrypy.test import helper
-
-class CacheTest(helper.CPWebCase):
-
- def setup_server():
-
- class Root:
-
- _cp_config = {'tools.caching.on': True}
-
- def __init__(self):
- self.counter = 0
- self.control_counter = 0
- self.longlock = threading.Lock()
-
- def index(self):
- self.counter += 1
- msg = "visit #%s" % self.counter
- return msg
- index.exposed = True
-
- def control(self):
- self.control_counter += 1
- return "visit #%s" % self.control_counter
- control.exposed = True
-
- def a_gif(self):
- cherrypy.response.headers['Last-Modified'] = httputil.HTTPDate()
- return gif_bytes
- a_gif.exposed = True
-
- def long_process(self, seconds='1'):
- try:
- self.longlock.acquire()
- time.sleep(float(seconds))
- finally:
- self.longlock.release()
- return 'success!'
- long_process.exposed = True
-
- def clear_cache(self, path):
- cherrypy._cache.store[cherrypy.request.base + path].clear()
- clear_cache.exposed = True
-
- class VaryHeaderCachingServer(object):
-
- _cp_config = {'tools.caching.on': True,
- 'tools.response_headers.on': True,
- 'tools.response_headers.headers': [('Vary', 'Our-Varying-Header')],
- }
-
- def __init__(self):
- self.counter = count(1)
-
- def index(self):
- return "visit #%s" % next(self.counter)
- index.exposed = True
-
- class UnCached(object):
- _cp_config = {'tools.expires.on': True,
- 'tools.expires.secs': 60,
- 'tools.staticdir.on': True,
- 'tools.staticdir.dir': 'static',
- 'tools.staticdir.root': curdir,
- }
-
- def force(self):
- cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
- self._cp_config['tools.expires.force'] = True
- self._cp_config['tools.expires.secs'] = 0
- return "being forceful"
- force.exposed = True
- force._cp_config = {'tools.expires.secs': 0}
-
- def dynamic(self):
- cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
- cherrypy.response.headers['Cache-Control'] = 'private'
- return "D-d-d-dynamic!"
- dynamic.exposed = True
-
- def cacheable(self):
- cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
- return "Hi, I'm cacheable."
- cacheable.exposed = True
-
- def specific(self):
- cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable'
- return "I am being specific"
- specific.exposed = True
- specific._cp_config = {'tools.expires.secs': 86400}
-
- class Foo(object):pass
-
- def wrongtype(self):
- cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable'
- return "Woops"
- wrongtype.exposed = True
- wrongtype._cp_config = {'tools.expires.secs': Foo()}
-
- cherrypy.tree.mount(Root())
- cherrypy.tree.mount(UnCached(), "/expires")
- cherrypy.tree.mount(VaryHeaderCachingServer(), "/varying_headers")
- cherrypy.config.update({'tools.gzip.on': True})
- setup_server = staticmethod(setup_server)
-
- def testCaching(self):
- elapsed = 0.0
- for trial in range(10):
- self.getPage("/")
- # The response should be the same every time,
- # except for the Age response header.
- self.assertBody('visit #1')
- if trial != 0:
- age = int(self.assertHeader("Age"))
- self.assert_(age >= elapsed)
- elapsed = age
-
- # POST, PUT, DELETE should not be cached.
- self.getPage("/", method="POST")
- self.assertBody('visit #2')
- # Because gzip is turned on, the Vary header should always Vary for content-encoding
- self.assertHeader('Vary', 'Accept-Encoding')
- # The previous request should have invalidated the cache,
- # so this request will recalc the response.
- self.getPage("/", method="GET")
- self.assertBody('visit #3')
- # ...but this request should get the cached copy.
- self.getPage("/", method="GET")
- self.assertBody('visit #3')
- self.getPage("/", method="DELETE")
- self.assertBody('visit #4')
-
- # The previous request should have invalidated the cache,
- # so this request will recalc the response.
- self.getPage("/", method="GET", headers=[('Accept-Encoding', 'gzip')])
- self.assertHeader('Content-Encoding', 'gzip')
- self.assertHeader('Vary')
- self.assertEqual(cherrypy.lib.encoding.decompress(self.body), ntob("visit #5"))
-
- # Now check that a second request gets the gzip header and gzipped body
- # This also tests a bug in 3.0 to 3.0.2 whereby the cached, gzipped
- # response body was being gzipped a second time.
- self.getPage("/", method="GET", headers=[('Accept-Encoding', 'gzip')])
- self.assertHeader('Content-Encoding', 'gzip')
- self.assertEqual(cherrypy.lib.encoding.decompress(self.body), ntob("visit #5"))
-
- # Now check that a third request that doesn't accept gzip
- # skips the cache (because the 'Vary' header denies it).
- self.getPage("/", method="GET")
- self.assertNoHeader('Content-Encoding')
- self.assertBody('visit #6')
-
- def testVaryHeader(self):
- self.getPage("/varying_headers/")
- self.assertStatus("200 OK")
- self.assertHeaderItemValue('Vary', 'Our-Varying-Header')
- self.assertBody('visit #1')
-
- # Now check that different 'Vary'-fields don't evict each other.
- # This test creates 2 requests with different 'Our-Varying-Header'
- # and then tests if the first one still exists.
- self.getPage("/varying_headers/", headers=[('Our-Varying-Header', 'request 2')])
- self.assertStatus("200 OK")
- self.assertBody('visit #2')
-
- self.getPage("/varying_headers/", headers=[('Our-Varying-Header', 'request 2')])
- self.assertStatus("200 OK")
- self.assertBody('visit #2')
-
- self.getPage("/varying_headers/")
- self.assertStatus("200 OK")
- self.assertBody('visit #1')
-
- def testExpiresTool(self):
- # test setting an expires header
- self.getPage("/expires/specific")
- self.assertStatus("200 OK")
- self.assertHeader("Expires")
-
- # test exceptions for bad time values
- self.getPage("/expires/wrongtype")
- self.assertStatus(500)
- self.assertInBody("TypeError")
-
- # static content should not have "cache prevention" headers
- self.getPage("/expires/index.html")
- self.assertStatus("200 OK")
- self.assertNoHeader("Pragma")
- self.assertNoHeader("Cache-Control")
- self.assertHeader("Expires")
-
- # dynamic content that sets indicators should not have
- # "cache prevention" headers
- self.getPage("/expires/cacheable")
- self.assertStatus("200 OK")
- self.assertNoHeader("Pragma")
- self.assertNoHeader("Cache-Control")
- self.assertHeader("Expires")
-
- self.getPage('/expires/dynamic')
- self.assertBody("D-d-d-dynamic!")
- # the Cache-Control header should be untouched
- self.assertHeader("Cache-Control", "private")
- self.assertHeader("Expires")
-
- # configure the tool to ignore indicators and replace existing headers
- self.getPage("/expires/force")
- self.assertStatus("200 OK")
- # This also gives us a chance to test 0 expiry with no other headers
- self.assertHeader("Pragma", "no-cache")
- if cherrypy.server.protocol_version == "HTTP/1.1":
- self.assertHeader("Cache-Control", "no-cache, must-revalidate")
- self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
-
- # static content should now have "cache prevention" headers
- self.getPage("/expires/index.html")
- self.assertStatus("200 OK")
- self.assertHeader("Pragma", "no-cache")
- if cherrypy.server.protocol_version == "HTTP/1.1":
- self.assertHeader("Cache-Control", "no-cache, must-revalidate")
- self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
-
- # the cacheable handler should now have "cache prevention" headers
- self.getPage("/expires/cacheable")
- self.assertStatus("200 OK")
- self.assertHeader("Pragma", "no-cache")
- if cherrypy.server.protocol_version == "HTTP/1.1":
- self.assertHeader("Cache-Control", "no-cache, must-revalidate")
- self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
-
- self.getPage('/expires/dynamic')
- self.assertBody("D-d-d-dynamic!")
- # dynamic sets Cache-Control to private but it should be
- # overwritten here ...
- self.assertHeader("Pragma", "no-cache")
- if cherrypy.server.protocol_version == "HTTP/1.1":
- self.assertHeader("Cache-Control", "no-cache, must-revalidate")
- self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
-
- def testLastModified(self):
- self.getPage("/a.gif")
- self.assertStatus(200)
- self.assertBody(gif_bytes)
- lm1 = self.assertHeader("Last-Modified")
-
- # this request should get the cached copy.
- self.getPage("/a.gif")
- self.assertStatus(200)
- self.assertBody(gif_bytes)
- self.assertHeader("Age")
- lm2 = self.assertHeader("Last-Modified")
- self.assertEqual(lm1, lm2)
-
- # this request should match the cached copy, but raise 304.
- self.getPage("/a.gif", [('If-Modified-Since', lm1)])
- self.assertStatus(304)
- self.assertNoHeader("Last-Modified")
- if not getattr(cherrypy.server, "using_apache", False):
- self.assertHeader("Age")
-
- def test_antistampede(self):
- SECONDS = 4
- # We MUST make an initial synchronous request in order to create the
- # AntiStampedeCache object, and populate its selecting_headers,
- # before the actual stampede.
- self.getPage("/long_process?seconds=%d" % SECONDS)
- self.assertBody('success!')
- self.getPage("/clear_cache?path=" +
- quote('/long_process?seconds=%d' % SECONDS, safe=''))
- self.assertStatus(200)
- sys.stdout.write("prepped... ")
- sys.stdout.flush()
-
- start = datetime.datetime.now()
- def run():
- self.getPage("/long_process?seconds=%d" % SECONDS)
- # The response should be the same every time
- self.assertBody('success!')
- ts = [threading.Thread(target=run) for i in xrange(100)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
- self.assertEqualDates(start, datetime.datetime.now(),
- # Allow a second for our thread/TCP overhead etc.
- seconds=SECONDS + 1.1)
-
- def test_cache_control(self):
- self.getPage("/control")
- self.assertBody('visit #1')
- self.getPage("/control")
- self.assertBody('visit #1')
-
- self.getPage("/control", headers=[('Cache-Control', 'no-cache')])
- self.assertBody('visit #2')
- self.getPage("/control")
- self.assertBody('visit #2')
-
- self.getPage("/control", headers=[('Pragma', 'no-cache')])
- self.assertBody('visit #3')
- self.getPage("/control")
- self.assertBody('visit #3')
-
- time.sleep(1)
- self.getPage("/control", headers=[('Cache-Control', 'max-age=0')])
- self.assertBody('visit #4')
- self.getPage("/control")
- self.assertBody('visit #4')
-