Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/test/test_tools.py
blob: bc8579f06d1a8642fd791fd88ab2f7131fa5b3d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
"""Test the various means of instantiating and invoking tools."""

import gzip
import sys
from cherrypy._cpcompat import BytesIO, copyitems, itervalues, IncompleteRead, ntob, ntou, xrange
import time
timeout = 0.2
import types

import cherrypy
from cherrypy import tools


europoundUnicode = ntou('\x80\xa3')


#                             Client-side code                             #

from cherrypy.test import helper


class ToolTests(helper.CPWebCase):
    def setup_server():
        
        # Put check_access in a custom toolbox with its own namespace
        myauthtools = cherrypy._cptools.Toolbox("myauth")
        
        def check_access(default=False):
            if not getattr(cherrypy.request, "userid", default):
                raise cherrypy.HTTPError(401)
        myauthtools.check_access = cherrypy.Tool('before_request_body', check_access)
        
        def numerify():
            def number_it(body):
                for chunk in body:
                    for k, v in cherrypy.request.numerify_map:
                        chunk = chunk.replace(k, v)
                    yield chunk
            cherrypy.response.body = number_it(cherrypy.response.body)
        
        class NumTool(cherrypy.Tool):
            def _setup(self):
                def makemap():
                    m = self._merged_args().get("map", {})
                    cherrypy.request.numerify_map = copyitems(m)
                cherrypy.request.hooks.attach('on_start_resource', makemap)
                
                def critical():
                    cherrypy.request.error_response = cherrypy.HTTPError(502).set_response
                critical.failsafe = True
                
                cherrypy.request.hooks.attach('on_start_resource', critical)
                cherrypy.request.hooks.attach(self._point, self.callable)
        
        tools.numerify = NumTool('before_finalize', numerify)
        
        # It's not mandatory to inherit from cherrypy.Tool.
        class NadsatTool:
            
            def __init__(self):
                self.ended = {}
                self._name = "nadsat"
            
            def nadsat(self):
                def nadsat_it_up(body):
                    for chunk in body:
                        chunk = chunk.replace(ntob("good"), ntob("horrorshow"))
                        chunk = chunk.replace(ntob("piece"), ntob("lomtick"))
                        yield chunk
                cherrypy.response.body = nadsat_it_up(cherrypy.response.body)
            nadsat.priority = 0
            
            def cleanup(self):
                # This runs after the request has been completely written out.
                cherrypy.response.body = [ntob("razdrez")]
                id = cherrypy.request.params.get("id")
                if id:
                    self.ended[id] = True
            cleanup.failsafe = True
            
            def _setup(self):
                cherrypy.request.hooks.attach('before_finalize', self.nadsat)
                cherrypy.request.hooks.attach('on_end_request', self.cleanup)
        tools.nadsat = NadsatTool()
        
        def pipe_body():
            cherrypy.request.process_request_body = False
            clen = int(cherrypy.request.headers['Content-Length'])
            cherrypy.request.body = cherrypy.request.rfile.read(clen)
        
        # Assert that we can use a callable object instead of a function.
        class Rotator(object):
            def __call__(self, scale):
                r = cherrypy.response
                r.collapse_body()
                r.body = [chr((ord(x) + scale) % 256) for x in r.body[0]]
        cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator())
        
        def stream_handler(next_handler, *args, **kwargs):
            cherrypy.response.output = o = BytesIO()
            try:
                response = next_handler(*args, **kwargs)
                # Ignore the response and return our accumulated output instead.
                return o.getvalue()
            finally:
                o.close()
        cherrypy.tools.streamer = cherrypy._cptools.HandlerWrapperTool(stream_handler)
        
        class Root:
            def index(self):
                return "Howdy earth!"
            index.exposed = True
            
            def tarfile(self):
                cherrypy.response.output.write(ntob('I am '))
                cherrypy.response.output.write(ntob('a tarfile'))
            tarfile.exposed = True
            tarfile._cp_config = {'tools.streamer.on': True}
            
            def euro(self):
                hooks = list(cherrypy.request.hooks['before_finalize'])
                hooks.sort()
                cbnames = [x.callback.__name__ for x in hooks]
                assert cbnames == ['gzip'], cbnames
                priorities = [x.priority for x in hooks]
                assert priorities == [80], priorities
                yield ntou("Hello,")
                yield ntou("world")
                yield europoundUnicode
            euro.exposed = True
            
            # Bare hooks
            def pipe(self):
                return cherrypy.request.body
            pipe.exposed = True
            pipe._cp_config = {'hooks.before_request_body': pipe_body}
            
            # Multiple decorators; include kwargs just for fun.
            # Note that rotator must run before gzip.
            def decorated_euro(self, *vpath):
                yield ntou("Hello,")
                yield ntou("world")
                yield europoundUnicode
            decorated_euro.exposed = True
            decorated_euro = tools.gzip(compress_level=6)(decorated_euro)
            decorated_euro = tools.rotator(scale=3)(decorated_euro)
        
        root = Root()
        
        
        class TestType(type):
            """Metaclass which automatically exposes all functions in each subclass,
            and adds an instance of the subclass as an attribute of root.
            """
            def __init__(cls, name, bases, dct):
                type.__init__(cls, name, bases, dct)
                for value in itervalues(dct):
                    if isinstance(value, types.FunctionType):
                        value.exposed = True
                setattr(root, name.lower(), cls())
        class Test(object):
            __metaclass__ = TestType
        
        
        # METHOD ONE:
        # Declare Tools in _cp_config
        class Demo(Test):
            
            _cp_config = {"tools.nadsat.on": True}
            
            def index(self, id=None):
                return "A good piece of cherry pie"
            
            def ended(self, id):
                return repr(tools.nadsat.ended[id])
            
            def err(self, id=None):
                raise ValueError()
            
            def errinstream(self, id=None):
                yield "nonconfidential"
                raise ValueError()
                yield "confidential"
            
            # METHOD TWO: decorator using Tool()
            # We support Python 2.3, but the @-deco syntax would look like this:
            # @tools.check_access()
            def restricted(self):
                return "Welcome!"
            restricted = myauthtools.check_access()(restricted)
            userid = restricted
            
            def err_in_onstart(self):
                return "success!"
            
            def stream(self, id=None):
                for x in xrange(100000000):
                    yield str(x)
            stream._cp_config = {'response.stream': True}
        
        
        conf = {
            # METHOD THREE:
            # Declare Tools in detached config
            '/demo': {
                'tools.numerify.on': True,
                'tools.numerify.map': {ntob("pie"): ntob("3.14159")},
            },
            '/demo/restricted': {
                'request.show_tracebacks': False,
            },
            '/demo/userid': {
                'request.show_tracebacks': False,
                'myauth.check_access.default': True,
            },
            '/demo/errinstream': {
                'response.stream': True,
            },
            '/demo/err_in_onstart': {
                # Because this isn't a dict, on_start_resource will error.
                'tools.numerify.map': "pie->3.14159"
            },
            # Combined tools
            '/euro': {
                'tools.gzip.on': True,
                'tools.encode.on': True,
            },
            # Priority specified in config
            '/decorated_euro/subpath': {
                'tools.gzip.priority': 10,
            },
            # Handler wrappers
            '/tarfile': {'tools.streamer.on': True}
        }
        app = cherrypy.tree.mount(root, config=conf)
        app.request_class.namespaces['myauth'] = myauthtools
        
        if sys.version_info >= (2, 5):
            from cherrypy.test import _test_decorators
            root.tooldecs = _test_decorators.ToolExamples()
    setup_server = staticmethod(setup_server)

    def testHookErrors(self):
        self.getPage("/demo/?id=1")
        # If body is "razdrez", then on_end_request is being called too early.
        self.assertBody("A horrorshow lomtick of cherry 3.14159")
        # If this fails, then on_end_request isn't being called at all.
        time.sleep(0.1)
        self.getPage("/demo/ended/1")
        self.assertBody("True")
        
        valerr = '\n    raise ValueError()\nValueError'
        self.getPage("/demo/err?id=3")
        # If body is "razdrez", then on_end_request is being called too early.
        self.assertErrorPage(502, pattern=valerr)
        # If this fails, then on_end_request isn't being called at all.
        time.sleep(0.1)
        self.getPage("/demo/ended/3")
        self.assertBody("True")
        
        # If body is "razdrez", then on_end_request is being called too early.
        if (cherrypy.server.protocol_version == "HTTP/1.0" or
            getattr(cherrypy.server, "using_apache", False)):
            self.getPage("/demo/errinstream?id=5")
            # Because this error is raised after the response body has
            # started, the status should not change to an error status.
            self.assertStatus("200 OK")
            self.assertBody("nonconfidential")
        else:
            # Because this error is raised after the response body has
            # started, and because it's chunked output, an error is raised by
            # the HTTP client when it encounters incomplete output.
            self.assertRaises((ValueError, IncompleteRead), self.getPage,
                              "/demo/errinstream?id=5")
        # If this fails, then on_end_request isn't being called at all.
        time.sleep(0.1)
        self.getPage("/demo/ended/5")
        self.assertBody("True")
        
        # Test the "__call__" technique (compile-time decorator).
        self.getPage("/demo/restricted")
        self.assertErrorPage(401)
        
        # Test compile-time decorator with kwargs from config.
        self.getPage("/demo/userid")
        self.assertBody("Welcome!")
    
    def testEndRequestOnDrop(self):
        old_timeout = None
        try:
            httpserver = cherrypy.server.httpserver
            old_timeout = httpserver.timeout
        except (AttributeError, IndexError):
            return self.skip()
        
        try:
            httpserver.timeout = timeout
            
            # Test that on_end_request is called even if the client drops.
            self.persistent = True
            try:
                conn = self.HTTP_CONN
                conn.putrequest("GET", "/demo/stream?id=9", skip_host=True)
                conn.putheader("Host", self.HOST)
                conn.endheaders()
                # Skip the rest of the request and close the conn. This will
                # cause the server's active socket to error, which *should*
                # result in the request being aborted, and request.close being
                # called all the way up the stack (including WSGI middleware),
                # eventually calling our on_end_request hook.
            finally:
                self.persistent = False
            time.sleep(timeout * 2)
            # Test that the on_end_request hook was called.
            self.getPage("/demo/ended/9")
            self.assertBody("True")
        finally:
            if old_timeout is not None:
                httpserver.timeout = old_timeout
    
    def testGuaranteedHooks(self):
        # The 'critical' on_start_resource hook is 'failsafe' (guaranteed
        # to run even if there are failures in other on_start methods).
        # This is NOT true of the other hooks.
        # Here, we have set up a failure in NumerifyTool.numerify_map,
        # but our 'critical' hook should run and set the error to 502.
        self.getPage("/demo/err_in_onstart")
        self.assertErrorPage(502)
        self.assertInBody("AttributeError: 'str' object has no attribute 'items'")
    
    def testCombinedTools(self):
        expectedResult = (ntou("Hello,world") + europoundUnicode).encode('utf-8')
        zbuf = BytesIO()
        zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=9)
        zfile.write(expectedResult)
        zfile.close()
        
        self.getPage("/euro", headers=[("Accept-Encoding", "gzip"),
                                        ("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7")])
        self.assertInBody(zbuf.getvalue()[:3])
        
        zbuf = BytesIO()
        zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6)
        zfile.write(expectedResult)
        zfile.close()
        
        self.getPage("/decorated_euro", headers=[("Accept-Encoding", "gzip")])
        self.assertInBody(zbuf.getvalue()[:3])
        
        # This returns a different value because gzip's priority was
        # lowered in conf, allowing the rotator to run after gzip.
        # Of course, we don't want breakage in production apps,
        # but it proves the priority was changed.
        self.getPage("/decorated_euro/subpath",
                     headers=[("Accept-Encoding", "gzip")])
        self.assertInBody(''.join([chr((ord(x) + 3) % 256) for x in zbuf.getvalue()]))
    
    def testBareHooks(self):
        content = "bit of a pain in me gulliver"
        self.getPage("/pipe",
                     headers=[("Content-Length", str(len(content))),
                              ("Content-Type", "text/plain")],
                     method="POST", body=content)
        self.assertBody(content)
    
    def testHandlerWrapperTool(self):
        self.getPage("/tarfile")
        self.assertBody("I am a tarfile")
    
    def testToolWithConfig(self):
        if not sys.version_info >= (2, 5):
            return self.skip("skipped (Python 2.5+ only)")
        
        self.getPage('/tooldecs/blah')
        self.assertHeader('Content-Type', 'application/data')
    
    def testWarnToolOn(self):
        # get
        try:
            numon = cherrypy.tools.numerify.on
        except AttributeError:
            pass
        else:
            raise AssertionError("Tool.on did not error as it should have.")
        
        # set
        try:
            cherrypy.tools.numerify.on = True
        except AttributeError:
            pass
        else:
            raise AssertionError("Tool.on did not error as it should have.")