Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

root/branches/cherrypy-2.x/cherrypy/test/test_core.py

Revision 2019 (checked in by nick125, 5 months ago)

Added a unit test, fixed an issue in the core unittest with ipv6-enabled systems and fixed an issue in parseRequestLine() when URL paths contained spaces (or anything that would split on a space)

* Added a encoded URL unittest to test/test_core.py
* Fixed lib/httptools.py's parseRequestLine() to not barf when the request line did not unpack to three values (i.e., if the path contained spaces)
* Fixed the logging test in test_core.py to fix an issue where the logger would log ipv6 addresses.

  • Property svn:eol-style set to native
Line 
1 """Basic tests for the CherryPy core: request handling."""
2
3 import test
4 test.prefer_parent_path()
5
6 import cherrypy
7 from cherrypy.lib import cptools, httptools
8 import types
9
10 import os
11 localDir = os.path.dirname(__file__)
12 log_file = os.path.join(localDir, "error.log")
13 log_access_file = os.path.join(localDir, "access.log")
14
15 defined_http_methods = ("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE",
16                         "TRACE", "CONNECT")
17
18
19 def setup_server():
20     class Root:
21        
22         def index(self):
23             return "hello"
24         index.exposed = True
25        
26         def andnow(self):
27             return "the larch"
28         andnow.exposed = True
29        
30         def global_(self):
31             pass
32         global_.exposed = True
33        
34         def delglobal(self):
35             del self.__class__.__dict__['global_']
36         delglobal.exposed = True
37        
38         def defct(self, newct):
39             newct = "text/%s" % newct
40             cherrypy.config.update({'server.default_content_type': newct})
41         defct.exposed = True
42        
43         def upload(self, file):
44             return "Size: %s" % len(file.file.read())
45         upload.exposed = True
46    
47     cherrypy.root = Root()
48
49
50     class TestType(type):
51         """Metaclass which automatically exposes all functions in each subclass,
52         and adds an instance of the subclass as an attribute of cherrypy.root.
53         """
54         def __init__(cls, name, bases, dct):
55             type.__init__(name, bases, dct)
56             for value in dct.itervalues():
57                 if isinstance(value, types.FunctionType):
58                     value.exposed = True
59             setattr(cherrypy.root, name.lower(), cls())
60     class Test(object):
61         __metaclass__ = TestType
62
63
64     class Params(Test):
65        
66         def index(self, thing):
67             return repr(thing)
68        
69         def ismap(self, x, y):
70             return "Coordinates: %s, %s" % (x, y)
71        
72         def default(self, *args, **kwargs):
73             return "args: %s kwargs: %s" % (args, kwargs)
74
75
76     class Status(Test):
77        
78         def index(self):
79             return "normal"
80        
81         def blank(self):
82             cherrypy.response.status = ""
83        
84         # According to RFC 2616, new status codes are OK as long as they
85         # are between 100 and 599.
86        
87         # Here is an illegal code...
88         def illegal(self):
89             cherrypy.response.status = 781
90             return "oops"
91        
92         # ...and here is an unknown but legal code.
93         def unknown(self):
94             cherrypy.response.status = "431 My custom error"
95             return "funky"
96        
97         # Non-numeric code
98         def bad(self):
99             cherrypy.response.status = "error"
100             return "bad news"
101
102
103     class Redirect(Test):
104        
105         class Error:
106             def _cp_on_error(self):
107                 raise cherrypy.HTTPRedirect("/errpage")
108            
109             def index(self):
110                 raise NameError()
111             index.exposed = True
112         error = Error()
113        
114         def index(self):
115             return "child"
116        
117         def by_code(self, code):
118             raise cherrypy.HTTPRedirect("somewhere else", code)
119        
120         def nomodify(self):
121             raise cherrypy.HTTPRedirect("", 304)
122        
123         def proxy(self):
124             raise cherrypy.HTTPRedirect("proxy", 305)
125        
126         def stringify(self):
127             return str(cherrypy.HTTPRedirect("/"))
128
129
130     class LoginFilter:
131        
132         def before_main(self):
133             if cherrypy.config.get("auth.on", False):
134                 if not getattr(cherrypy.request, "login", None):
135                     raise cherrypy.InternalRedirect("/internalredirect/login")
136
137     class InternalRedirect(Test):
138        
139         _cp_filters = [LoginFilter()]
140        
141         def index(self):
142             raise cherrypy.InternalRedirect("/")
143        
144         def relative(self):
145             raise cherrypy.InternalRedirect("cousin")
146        
147         def cousin(self):
148             return "I am a redirected page."
149        
150         def petshop(self, user_id):
151             if user_id == "parrot":
152                 # Trade it for a slug when redirecting
153                 raise cherrypy.InternalRedirect('/image/getImagesByUser',
154                                                "user_id=slug")
155             elif user_id == "terrier":
156                 # Trade it for a fish when redirecting
157                 raise cherrypy.InternalRedirect('/image/getImagesByUser',
158                                                {"user_id": "fish"})
159             else:
160                 raise cherrypy.InternalRedirect('/image/getImagesByUser')
161        
162         def secure(self):
163             return "Welcome!"
164        
165         def login(self):
166             return "Please log in"
167
168
169     class Image(Test):
170        
171         def getImagesByUser(self, user_id):
172             return "0 images for %s" % user_id
173
174
175     class Flatten(Test):
176        
177         def as_string(self):
178             return "content"
179        
180         def as_list(self):
181             return ["con", "tent"]
182        
183         def as_yield(self):
184             yield "content"
185        
186         def as_dblyield(self):
187             yield self.as_yield()
188        
189         def as_refyield(self):
190             for chunk in self.as_yield():
191                 yield chunk
192
193
194     class Error(Test):
195        
196         def custom(self):
197             raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!")
198        
199         def noexist(self):
200             raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!")
201        
202         def page_method(self):
203             raise ValueError()
204        
205         def page_yield(self):
206             yield "howdy"
207             raise ValueError()
208        
209         def page_streamed(self):
210             yield "word up"
211             raise ValueError()
212             yield "very oops"
213        
214         def cause_err_in_finalize(self):
215             # Since status must start with an int, this should error.
216             cherrypy.response.status = "ZOO OK"
217        
218         def log_unhandled(self):
219             raise ValueError()
220        
221         def rethrow(self):
222             """Test that an error raised here will be thrown out to the server."""
223             raise ValueError()
224
225
226     class Ranges(Test):
227        
228         def get_ranges(self, bytes):
229             return repr(httptools.getRanges('bytes=%s' % bytes, 8))
230        
231         def slice_file(self):
232             path = os.path.join(os.getcwd(), os.path.dirname(__file__))
233             return cptools.serveFile(os.path.join(path, "static/index.html"))
234
235
236     class Expect(Test):
237        
238         def expectation_failed(self):
239             expect = cherrypy.request.headers.elements("Expect")
240             if expect and expect[0].value != '100-continue':
241                 raise cherrypy.HTTPError(400)
242             raise cherrypy.HTTPError(417, 'Expectation Failed')
243
244     class Headers(Test):
245         def default(self, headername):
246             """Spit back out the value for the requested header."""
247             return cherrypy.request.headers[headername]
248        
249         def doubledheaders(self):
250             # From http://www.cherrypy.org/ticket/165:
251             # "header field names should not be case sensitive sayes the rfc.
252             # if i set a headerfield in complete lowercase i end up with two
253             # header fields, one in lowercase, the other in mixed-case."
254            
255             # Set the most common headers
256             hMap = cherrypy.response.headers
257             hMap['content-type'] = "text/html"
258             hMap['content-length'] = 18
259             hMap['server'] = 'CherryPy headertest'
260             hMap['location'] = ('%s://127.0.0.1:%s/headers/'
261                                 % (cherrypy.request.remote_port,
262                                    cherrypy.request.scheme))
263            
264             # Set a rare header for fun
265             hMap['Expires'] = 'Thu, 01 Dec 2194 16:00:00 GMT'
266            
267             return "double header test"
268        
269         def ifmatch(self):
270             val = cherrypy.request.headers['If-Match']
271             cherrypy.response.headers['ETag'] = val
272             return repr(val)
273    
274    
275     class HeaderElements(Test):
276        
277         def get_elements(self, headername):
278             e = cherrypy.request.headers.elements(headername)
279             return "\n".join([str(x) for x in e])
280    
281    
282     class Method(Test):
283        
284         def index(self):
285             m = cherrypy.request.method
286             if m in defined_http_methods:
287                 return m
288            
289             if m == "LINK":
290                 raise cherrypy.HTTPError(405)
291             else:
292                 raise cherrypy.HTTPError(501)
293        
294         def parameterized(self, data):
295             return data
296        
297         def request_body(self):
298             # This should be a file object (temp file),
299             # which CP will just pipe back out if we tell it to.
300             return cherrypy.request.body
301
302     class Divorce:
303         """HTTP Method handlers shouldn't collide with normal method names.
304         For example, a GET-handler shouldn't collide with a method named 'get'.
305         
306         If you build HTTP method dispatching into CherryPy, rewrite this class
307         to use your new dispatch mechanism and make sure that:
308             "GET /divorce HTTP/1.1" maps to divorce.index() and
309             "GET /divorce/get?ID=13 HTTP/1.1" maps to divorce.get()
310         """
311        
312         documents = {}
313        
314         def index(self):
315             yield "<h1>Choose your document</h1>\n"
316             yield "<ul>\n"
317             for id, contents in self.documents.iteritems():
318                 yield ("    <li><a href='/divorce/get?ID=%s'>%s</a>: %s</li>\n"
319                        % (id, id, contents))
320             yield "</ul>"
321         index.exposed = True
322        
323         def get(self, ID):
324             return ("Divorce document %s: %s" %
325                     (ID, self.documents.get(ID, "empty")))
326         get.exposed = True
327
328     cherrypy.root.divorce = Divorce()
329
330     class Encoded:
331         def default(self, *args):
332             return "Encoded response"
333         default.exposed = True
334
335     cherrypy.root.encoded = Encoded()
336
337     class Cookies(Test):
338        
339         def single(self, name):
340             cookie = cherrypy.request.simple_cookie[name]
341             cherrypy.response.simple_cookie[name] = cookie.value
342        
343         def multiple(self, names):
344             for name in names:
345                 cookie = cherrypy.request.simple_cookie[name]
346                 cherrypy.response.simple_cookie[name] = cookie.value
347
348
349     class ThreadLocal(Test):
350        
351         def index(self):
352             existing = repr(getattr(cherrypy.request, "asdf", None))
353             cherrypy.request.asdf = "rassfrassin"
354             return existing
355    
356     cherrypy.config.update({
357         'global': {
358             'server.log_to_screen': False,
359             'server.environment': 'production',
360             'server.show_tracebacks': True,
361             'server.max_request_body_size': 200,
362             'server.max_request_header_size': 500,
363         },
364         '/flatten': {
365             'server.log_file': log_file,
366             'server.log_access_file': log_access_file,
367         },
368         '/params': {
369             'server.log_file': log_file,
370         },
371         '/internalredirect/secure': {
372             'auth.on': True,
373         },
374         '/error': {
375             'server.log_file': log_file,
376             'server.log_tracebacks': True,
377         },
378         '/error/page_streamed': {
379             'stream_response': True,
380         },
381         '/error/cause_err_in_finalize': {
382             'server.show_tracebacks': False,
383         },
384         '/error/custom': {
385             'error_page.404': os.path.join(localDir, "static/index.html"),
386         },
387         '/error/noexist': {
388             'error_page.404': "nonexistent.html",
389         },
390         '/error/log_unhandled': {
391             'server.log_tracebacks': False,
392             'server.log_unhandled_tracebacks': True,
393         },
394         '/error/rethrow': {
395             'server.throw_errors': True,
396         },
397     })
398
399
400 #                             Client-side code                             #
401
402 import helper
403
404 class CoreRequestHandlingTest(helper.CPWebCase):
405    
406     def testParams(self):
407         self.getPage("/params/?thing=a")
408         self.assertBody("'a'")
409        
410         self.getPage("/params/?thing=a&thing=b&thing=c")
411         self.assertBody("['a', 'b', 'c']")
412        
413         # Test friendly error message when given params are not accepted.
414         ignore = helper.webtest.ignored_exceptions
415         ignore.append(TypeError)
416         try:
417             self.getPage("/params/?notathing=meeting")
418             self.assertInBody("index() got an unexpected keyword argument 'notathing'")
419         finally:
420             ignore.pop()
421        
422         # Test "% HEX HEX"-encoded URL, param keys, and values
423         self.getPage("/params/%d4%20%e3/cheese?Gruy%E8re=Bulgn%e9ville")
424         self.assertBody(r"args: ('\xd4 \xe3', 'cheese') "
425                         r"kwargs: {'Gruy\xe8re': 'Bulgn\xe9ville'}")
426        
427         # Make sure that encoded = and & get parsed correctly
428         self.getPage("/params/code?url=http%3A//cherrypy.org/index%3Fa%3D1%26b%3D2")
429         self.assertBody(r"args: ('code',) "
430                         r"kwargs: {'url': 'http://cherrypy.org/index?a=1&b=2'}")
431        
432         # Test coordinates sent by <img ismap>
433         self.getPage("/params/ismap?223,114")
434         self.assertBody("Coordinates: 223, 114")
435    
436     def testStatus(self):
437         self.getPage("/status/")
438         self.assertBody('normal')
439         self.assertStatus(200)
440        
441         self.getPage("/status/blank")
442         self.assertBody('')
443         self.assertStatus(200)
444        
445         self.getPage("/status/illegal")
446         self.assertStatus(500)
447         msg = "Illegal response status from server (781 is out of range)."
448         self.assertErrorPage(500, msg)
449        
450         self.getPage("/status/unknown")
451         self.assertBody('funky')
452         self.assertStatus(431)
453        
454         self.getPage("/status/bad")
455         self.assertStatus(500)
456         msg = "Illegal response status from server ('error' is non-numeric)."
457         self.assertErrorPage(500, msg)
458    
459     def testLogging(self):
460         open(log_file, "wb").write("")
461         open(log_access_file, "wb").write("")
462        
463         self.getPage("/flatten/as_string")
464         self.assertBody('content')
465         self.assertStatus(200)
466        
467         self.getPage("/flatten/as_yield")
468         self.assertBody('content')
469         self.assertStatus(200)
470        
471         data = open(log_access_file, "rb").readlines()
472         address = data[0].split()[0]
473         if address not in ['127.0.0.1', '::ffff:127.0.0.1', '::1']:
474             self.fail('access log did not contain remote IP:\n%s' % (data[0]))
475          
476         haslength = False
477         for k, v in self.headers:
478             if k.lower() == 'content-length':
479                 haslength = True
480         if haslength:
481             self.assert_(data[0].endswith('] "GET %s/flatten/as_string HTTP/1.1" 200 7 "" ""\n'
482                                           % self.prefix()))
483         else:
484             self.assert_(data[0].endswith('] "GET %s/flatten/as_string HTTP/1.1" 200 - "" ""\n'
485                                           % self.prefix()))
486        
487         address = data[1].split()[0]
488         if address not in ['127.0.0.1', '::ffff:127.0.0.1', '::1']:
489             self.fail('access log did not contain remote IP:\n%s' % (data[0]))
490
491         haslength = False
492         for k, v in self.headers:
493             if k.lower() == 'content-length':
494                 haslength = True
495         if haslength:
496             self.assert_(data[1].endswith('] "GET %s/flatten/as_yield HTTP/1.1" 200 7 "" ""\n'
497                                           % self.prefix()))
498         else:
499             self.assert_(data[1].endswith('] "GET %s/flatten/as_yield HTTP/1.1" 200 - "" ""\n'
500                                           % self.prefix()))
501        
502         data = open(log_file, "rb").readlines()
503         self.assertEqual(data, [])
504        
505         ignore = helper.webtest.ignored_exceptions
506         ignore.append(ValueError)
507         try:
508             # Test that tracebacks get written to the error log.
509             self.getPage("/error/page_method")
510             self.assertInBody("raise ValueError()")
511             data = open(log_file, "rb").readlines()
512             self.assertEqual(data[0][-41:], ' INFO Traceback (most recent call last):\n')
513             self.assertEqual(data[6], '    raise ValueError()\n')
514            
515             # Test that unhandled tracebacks get written to the error log
516             # if log_tracebacks is False but log_unhandled_tracebacks is True.
517             self.getPage("/error/log_unhandled")
518