Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

root/branches/cherrypy-2.1/cherrypy/test/webtest.py

Revision 697 (checked in by fumanchu, 3 years ago)

webtest sys.e[X]it option now works. There's also a new WebCase?.exit() method which you can override, in case you need to do other things before exiting.

Line 
1 """
2 Copyright (c) 2004, CherryPy Team (team@cherrypy.org)
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without modification,
6 are permitted provided that the following conditions are met:
7
8     * Redistributions of source code must retain the above copyright notice,
9       this list of conditions and the following disclaimer.
10     * Redistributions in binary form must reproduce the above copyright notice,
11       this list of conditions and the following disclaimer in the documentation
12       and/or other materials provided with the distribution.
13     * Neither the name of the CherryPy Team nor the names of its contributors
14       may be used to endorse or promote products derived from this software
15       without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
18 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 """
28
29 """Extensions to unittest for web frameworks.
30
31 Use the WebCase.getPage method to request a page from your HTTP server.
32
33 Framework Integration
34 =====================
35
36 If you have control over your server process, you can handle errors
37 in the server-side of the HTTP conversation a bit better. You must run
38 both the client (your WebCase tests) and the server in the same process
39 (but in separate threads, obviously).
40
41 When an error occurs in the framework, call server_error. It will print
42 the traceback to stdout, and keep any assertions you have from running
43 (the assumption is that, if the server errors, the page output won't be
44 of further significance to your tests).
45 """
46
47 import os, sys, time, re
48 import types
49 import pprint
50 import socket
51 import httplib
52 import traceback
53
54 from unittest import *
55 from unittest import _TextTestResult
56
57
58 class TerseTestResult(_TextTestResult):
59    
60     def printErrors(self):
61         # Overridden to avoid unnecessary empty line
62         if self.errors or self.failures:
63             if self.dots or self.showAll:
64                 self.stream.writeln()
65             self.printErrorList('ERROR', self.errors)
66             self.printErrorList('FAIL', self.failures)
67
68
69 class TerseTestRunner(TextTestRunner):
70     """A test runner class that displays results in textual form."""
71    
72     def _makeResult(self):
73         return TerseTestResult(self.stream, self.descriptions, self.verbosity)
74    
75     def run(self, test):
76         "Run the given test case or test suite."
77         # Overridden to remove unnecessary empty lines and separators
78         result = self._makeResult()
79         startTime = time.time()
80         test(result)
81         timeTaken = float(time.time() - startTime)
82         result.printErrors()
83         if not result.wasSuccessful():
84             self.stream.write("FAILED (")
85             failed, errored = map(len, (result.failures, result.errors))
86             if failed:
87                 self.stream.write("failures=%d" % failed)
88             if errored:
89                 if failed: self.stream.write(", ")
90                 self.stream.write("errors=%d" % errored)
91             self.stream.writeln(")")
92         return result
93
94
95 class ReloadingTestLoader(TestLoader):
96    
97     def loadTestsFromName(self, name, module=None):
98         """Return a suite of all tests cases given a string specifier.
99
100         The name may resolve either to a module, a test case class, a
101         test method within a test case class, or a callable object which
102         returns a TestCase or TestSuite instance.
103
104         The method optionally resolves the names relative to a given module.
105         """
106         parts = name.split('.')
107         if module is None:
108             if not parts:
109                 raise ValueError("incomplete test name: %s" % name)
110             else:
111                 parts_copy = parts[:]
112                 while parts_copy:
113                     target = ".".join(parts_copy)
114                     if target in sys.modules:
115                         module = reload(sys.modules[target])
116                         break
117                     else:
118                         try:
119                             module = __import__(target)
120                             break
121                         except ImportError:
122                             del parts_copy[-1]
123                             if not parts_copy:
124                                 raise
125                 parts = parts[1:]
126         obj = module
127         for part in parts:
128             obj = getattr(obj, part)
129        
130         if type(obj) == types.ModuleType:
131             return self.loadTestsFromModule(obj)
132         elif (isinstance(obj, (type, types.ClassType)) and
133               issubclass(obj, TestCase)):
134             return self.loadTestsFromTestCase(obj)
135         elif type(obj) == types.UnboundMethodType:
136             return obj.im_class(obj.__name__)
137         elif callable(obj):
138             test = obj()
139             if not isinstance(test, TestCase) and \
140                not isinstance(test, TestSuite):
141                 raise ValueError("calling %s returned %s, "
142                                  "not a test" % (obj,test))
143             return test
144         else:
145             raise ValueError("don't know how to make test from: %s" % obj)
146
147
148 try:
149     # On Windows, msvcrt.getch reads a single char without output.
150     import msvcrt
151     def getchar():
152         return msvcrt.getch()
153 except ImportError:
154     # Unix getchr
155     import tty, termios
156     def getchar():
157         fd = sys.stdin.fileno()
158         old_settings = termios.tcgetattr(fd)
159         try:
160             tty.setraw(sys.stdin.fileno())
161             ch = sys.stdin.read(1)
162         finally:
163             termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
164         return ch
165
166
167 class WebCase(TestCase):
168    
169     HOST = "127.0.0.1"
170     PORT = 8000
171    
172     def getPage(self, url, headers=None, method="GET", body=None):
173         """Open the url with debugging support. Return status, headers, body."""
174         ServerError.on = False
175        
176         self.url = url
177         result = openURL(url, headers, method, body, self.HOST, self.PORT)
178         self.status, self.headers, self.body = result
179        
180         # Build a list of request cookies from the previous response cookies.
181         self.cookies = [('Cookie', v) for k, v in self.headers
182                         if k.lower() == 'set-cookie']
183        
184         if ServerError.on:
185             raise ServerError()
186         return result
187    
188     interactive = True
189     console_height = 30
190    
191     def _handlewebError(self, msg):
192         if not self.interactive:
193             raise self.failureException(msg)
194        
195         print
196         print "    ERROR:", msg
197         p = "    Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> "
198         print p,
199         while True:
200             i = getchar().upper()
201             if i not in "BHSUIRX":
202                 continue
203             print i.upper()  # Also prints new line
204             if i == "B":
205                 for x, line in enumerate(self.body.splitlines()):
206                     if (x + 1) % self.console_height == 0:
207                         # The \r and comma should make the next line overwrite
208                         print "<-- More -->\r",
209                         m = getchar().lower()
210                         # Erase our "More" prompt
211                         print "            \r",
212                         if m == "q":
213                             break
214                     print line
215             elif i == "H":
216                 pprint.pprint(self.headers)
217             elif i == "S":
218                 print self.status
219             elif i == "U":
220                 print self.url
221             elif i == "I":
222                 # return without raising the normal exception
223                 return
224             elif i == "R":
225                 raise self.failureException(msg)
226             elif i == "X":
227                 self.exit()
228             print p,
229    
230     def exit(self):
231         sys.exit()
232    
233     def __call__(self, result=None):
234         if result is None:
235             result = self.defaultTestResult()
236         result.startTest(self)
237         testMethod = getattr(self, self._TestCase__testMethodName)
238         try:
239             try:
240                 self.setUp()
241             except (KeyboardInterrupt, SystemExit):
242                 raise
243             except:
244                 result.addError(self, self._TestCase__exc_info())
245                 return
246            
247             ok = 0
248             try:
249                 testMethod()
250                 ok = 1
251             except self.failureException:
252                 result.addFailure(self, self._TestCase__exc_info())
253             except (KeyboardInterrupt, SystemExit):
254                 raise
255             except:
256                 result.addError(self, self._TestCase__exc_info())
257            
258             try:
259                 self.tearDown()
260             except (KeyboardInterrupt, SystemExit):
261                 raise
262             except:
263                 result.addError(self, self._TestCase__exc_info())
264                 ok = 0
265             if ok:
266                 result.addSuccess(self)
267         finally:
268             result.stopTest(self)
269    
270     def assertStatus(self, status, msg=None):
271         """Fail if self.status != status."""
272         if not self.status == status:
273             if msg is None:
274                 msg = 'Status (%s) != %s' % (`self.status`, `status`)
275             self._handlewebError(msg)
276    
277     def assertHeader(self, key, value=None, msg=None):
278         """Fail if (key, [value]) not in self.headers."""
279         lowkey = key.lower()
280         for k, v in self.headers:
281             if k.lower() == lowkey:
282                 if value is None or str(value) == v:
283                     return
284        
285         if msg is None:
286             if value is None:
287                 msg = '%s not in headers' % `key`
288             else:
289                 msg = '%s:%s not in headers' % (`key`, `value`)
290         self._handlewebError(msg)
291    
292     def assertNoHeader(self, key, msg=None):
293         """Fail if key in self.headers."""
294         lowkey = key.lower()
295         matches = [k for k, v in self.headers if k.lower() == lowkey]
296         if matches:
297             if msg is None:
298                 msg = '%s in headers' % `key`
299             self._handlewebError(msg)
300    
301     def assertBody(self, value, msg=None):
302         """Fail if value != self.body."""
303         if value != self.body:
304             if msg is None:
305                 msg = 'expected body:\n%s\n\nactual body:\n%s' % (`value`, `self.body`)
306             self._handlewebError(msg)
307    
308     def assertInBody(self, value, msg=None):
309         """Fail if value not in self.body."""
310         if value not in self.body:
311             if msg is None:
312                 msg = '%s not in body' % `value`
313             self._handlewebError(msg)
314    
315     def assertNotInBody(self, value, msg=None):
316         """Fail if value in self.body."""
317         if value in self.body:
318             if msg is None:
319                 msg = '%s found in body' % `value`
320             self._handlewebError(msg)
321    
322     def assertMatchesBody(self, pattern, msg=None, flags=0):
323         """Fail if value (a regex pattern) is not in self.body."""
324         if re.search(pattern, self.body, flags) is None:
325             if msg is None:
326                 msg = 'No match for %s in body' % `pattern`
327             self._handlewebError(msg)
328
329
330
331 def cleanHeaders(headers, method, body, host, port):
332     """Return request headers, with required headers added (if missing)."""
333     if headers is None:
334         headers = []
335    
336     # Add the required Host request header if not present.
337     # [This specifies the host:port of the server, not the client.]
338     found = False
339     for k, v in headers:
340         if k.lower() == 'host':
341             found = True
342             break
343     if not found:
344         headers.append(("Host", "%s:%s" % (host, port)))
345    
346     if method in ("POST", "PUT"):
347         # Stick in default type and length headers if not present
348         found = False
349         for k, v in headers:
350             if k.lower() == 'content-type':
351                 found = True
352                 break
353         if not found:
354             headers.append(("Content-Type", "application/x-www-form-urlencoded"))
355             headers.append(("Content-Length", str(len(body or ""))))
356    
357     return headers
358
359
360 def openURL(url, headers=None, method="GET", body=None,
361             host="127.0.0.1", port=8000):
362     """Open the given HTTP resource and return status, headers, and body."""
363    
364     headers = cleanHeaders(headers, method, body, host, port)
365    
366     # Trying 10 times is simply in case of socket errors.
367     # Normal case--it should run once.
368     trial = 0
369     while trial < 10:
370         try:
371             conn = httplib.HTTPConnection(host, port)
372             conn.putrequest(method.upper(), url)
373            
374             for key, value in headers:
375                 conn.putheader(key, value)
376             conn.endheaders()
377            
378             if body is not None:
379                 conn.send(body)
380            
381             # Handle response
382             response = conn.getresponse()
383            
384             status = "%s %s" % (response.status, response.reason)
385            
386             outheaders = []
387             for line in response.msg.headers:
388                 key, value = line.split(":", 1)
389                 outheaders.append((key.strip(), value.strip()))
390            
391             outbody = response.read()
392            
393             conn.close()
394             return status, outheaders, outbody
395         except socket.error:
396             trial += 1
397             if trial >= 10:
398                 raise
399             else:
400                 time.sleep(0.5)
401
402
403 # Add any exceptions which your web framework handles
404 # normally (that you don't want server_error to trap).
405 ignored_exceptions = []
406
407 # You'll want set this to True when you can't guarantee
408 # that each response will immediately follow each request;
409 # for example, when handling requests via multiple threads.
410 ignore_all = False
411
412 class ServerError(Exception):
413     on = False
414
415
416 def server_error(exc=None):
417     """Server debug hook. Return True if exception handled, False if ignored.
418     
419     You probably want to wrap this, so you can still handle an error using
420     your framework when it's ignored.
421     """
422     if exc is None:
423         exc = sys.exc_info()
424    
425     if ignore_all or exc[0] in ignored_exceptions:
426         return False
427     else:
428         ServerError.on = True
429         print
430         print "".join(traceback.format_exception(*exc))
431         return True
432
Note: See TracBrowser for help on using the browser.

Hosted by WebFaction

Log in as guest/cpguest to create tickets