1 """Extensions to unittest for web frameworks.
2
3 Use the WebCase.getPage method to request a page from your HTTP server.
4
5 Framework Integration
6 =====================
7
8 If you have control over your server process, you can handle errors
9 in the server-side of the HTTP conversation a bit better. You must run
10 both the client (your WebCase tests) and the server in the same process
11 (but in separate threads, obviously).
12
13 When an error occurs in the framework, call server_error. It will print
14 the traceback to stdout, and keep any assertions you have from running
15 (the assumption is that, if the server errors, the page output will not
16 be of further significance to your tests).
17 """
18
19 import os, sys, time, re
20 import types
21 import pprint
22 import socket
23 import httplib
24 import traceback
25
26 from unittest import *
27 from unittest import _TextTestResult
28
29
31
33
34 if self.errors or self.failures:
35 if self.dots or self.showAll:
36 self.stream.writeln()
37 self.printErrorList('ERROR', self.errors)
38 self.printErrorList('FAIL', self.failures)
39
40
42 """A test runner class that displays results in textual form."""
43
46
47 - def run(self, test):
48 "Run the given test case or test suite."
49
50 result = self._makeResult()
51 startTime = time.time()
52 test(result)
53 timeTaken = float(time.time() - startTime)
54 result.printErrors()
55 if not result.wasSuccessful():
56 self.stream.write("FAILED (")
57 failed, errored = map(len, (result.failures, result.errors))
58 if failed:
59 self.stream.write("failures=%d" % failed)
60 if errored:
61 if failed: self.stream.write(", ")
62 self.stream.write("errors=%d" % errored)
63 self.stream.writeln(")")
64 return result
65
66
68
70 """Return a suite of all tests cases given a string specifier.
71
72 The name may resolve either to a module, a test case class, a
73 test method within a test case class, or a callable object which
74 returns a TestCase or TestSuite instance.
75
76 The method optionally resolves the names relative to a given module.
77 """
78 parts = name.split('.')
79 if module is None:
80 if not parts:
81 raise ValueError("incomplete test name: %s" % name)
82 else:
83 parts_copy = parts[:]
84 while parts_copy:
85 target = ".".join(parts_copy)
86 if target in sys.modules:
87 module = reload(sys.modules[target])
88 break
89 else:
90 try:
91 module = __import__(target)
92 break
93 except ImportError:
94 del parts_copy[-1]
95 if not parts_copy:
96 raise
97 parts = parts[1:]
98 obj = module
99 for part in parts:
100 obj = getattr(obj, part)
101
102 if type(obj) == types.ModuleType:
103 return self.loadTestsFromModule(obj)
104 elif (isinstance(obj, (type, types.ClassType)) and
105 issubclass(obj, TestCase)):
106 return self.loadTestsFromTestCase(obj)
107 elif type(obj) == types.UnboundMethodType:
108 return obj.im_class(obj.__name__)
109 elif callable(obj):
110 test = obj()
111 if not isinstance(test, TestCase) and \
112 not isinstance(test, TestSuite):
113 raise ValueError("calling %s returned %s, "
114 "not a test" % (obj,test))
115 return test
116 else:
117 raise ValueError("do not know how to make test from: %s" % obj)
118
119
120 try:
121
122 import msvcrt
124 return msvcrt.getch()
125 except ImportError:
126
127 import tty, termios
129 fd = sys.stdin.fileno()
130 old_settings = termios.tcgetattr(fd)
131 try:
132 tty.setraw(sys.stdin.fileno())
133 ch = sys.stdin.read(1)
134 finally:
135 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
136 return ch
137
138
140 HOST = "127.0.0.1"
141 PORT = 8000
142 HTTP_CONN = httplib.HTTPConnection
143 PROTOCOL = "HTTP/1.1"
144
146 """Make our HTTP_CONN persistent (or not).
147
148 If the 'on' argument is True (the default), then self.HTTP_CONN
149 will be set to an instance of httplib.HTTPConnection (or HTTPS
150 if self.scheme is "https"). This will then persist across requests.
151
152 We only allow for a single open connection, so if you call this
153 and we currently have an open connection, it will be closed.
154 """
155 try:
156 self.HTTP_CONN.close()
157 except (TypeError, AttributeError):
158 pass
159
160 if self.scheme == "https":
161 cls = httplib.HTTPSConnection
162 else:
163 cls = httplib.HTTPConnection
164
165 if on:
166 host = self.HOST
167 if not host:
168
169
170 host = "127.0.0.1"
171 self.HTTP_CONN = cls(host, self.PORT)
172
173 self.HTTP_CONN.auto_open = auto_open
174 self.HTTP_CONN.connect()
175 else:
176 self.HTTP_CONN = cls
177
179 return hasattr(self.HTTP_CONN, "__class__")
182 persistent = property(_get_persistent, _set_persistent)
183
184 - def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
185 """Open the url with debugging support. Return status, headers, body."""
186 ServerError.on = False
187
188 self.url = url
189 host = self.HOST
190 if not host:
191
192
193 host = "127.0.0.1"
194 result = openURL(url, headers, method, body, host, self.PORT,
195 self.HTTP_CONN, protocol or self.PROTOCOL)
196 self.status, self.headers, self.body = result
197
198
199 self.cookies = [('Cookie', v) for k, v in self.headers
200 if k.lower() == 'set-cookie']
201
202 if ServerError.on:
203 raise ServerError()
204 return result
205
206 interactive = True
207 console_height = 30
208
210 print
211 print " ERROR:", msg
212
213 if not self.interactive:
214 raise self.failureException(msg)
215
216 p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> "
217 print p,
218 while True:
219 i = getchar().upper()
220 if i not in "BHSUIRX":
221 continue
222 print i.upper()
223 if i == "B":
224 for x, line in enumerate(self.body.splitlines()):
225 if (x + 1) % self.console_height == 0:
226
227 print "<-- More -->\r",
228 m = getchar().lower()
229
230 print " \r",
231 if m == "q":
232 break
233 print line
234 elif i == "H":
235 pprint.pprint(self.headers)
236 elif i == "S":
237 print self.status
238 elif i == "U":
239 print self.url
240 elif i == "I":
241
242 return
243 elif i == "R":
244 raise self.failureException(msg)
245 elif i == "X":
246 self.exit()
247 print p,
248
251
252 if sys.version_info >= (2, 5):
254 if result is None:
255 result = self.defaultTestResult()
256 result.startTest(self)
257 testMethod = getattr(self, self._testMethodName)
258 try:
259 try:
260 self.setUp()
261 except (KeyboardInterrupt, SystemExit):
262 raise
263 except:
264 result.addError(self, self._exc_info())
265 return
266
267 ok = 0
268 try:
269 testMethod()
270 ok = 1
271 except self.failureException:
272 result.addFailure(self, self._exc_info())
273 except (KeyboardInterrupt, SystemExit):
274 raise
275 except:
276 result.addError(self, self._exc_info())
277
278 try:
279 self.tearDown()
280 except (KeyboardInterrupt, SystemExit):
281 raise
282 except:
283 result.addError(self, self._exc_info())
284 ok = 0
285 if ok:
286 result.addSuccess(self)
287 finally:
288 result.stopTest(self)
289 else:
291 if result is None:
292 result = self.defaultTestResult()
293 result.startTest(self)
294 testMethod = getattr(self, self._TestCase__testMethodName)
295 try:
296 try:
297 self.setUp()
298 except (KeyboardInterrupt, SystemExit):
299 raise
300 except:
301 result.addError(self, self._TestCase__exc_info())
302 return
303
304 ok = 0
305 try:
306 testMethod()
307 ok = 1
308 except self.failureException:
309 result.addFailure(self, self._TestCase__exc_info())
310 except (KeyboardInterrupt, SystemExit):
311 raise
312 except:
313 result.addError(self, self._TestCase__exc_info())
314
315 try:
316 self.tearDown()
317 except (KeyboardInterrupt, SystemExit):
318 raise
319 except:
320 result.addError(self, self._TestCase__exc_info())
321 ok = 0
322 if ok:
323 result.addSuccess(self)
324 finally:
325 result.stopTest(self)
326
355
357 """Fail if (key, [value]) not in self.headers."""
358 lowkey = key.lower()
359 for k, v in self.headers:
360 if k.lower() == lowkey:
361 if value is None or str(value) == v:
362 return v
363
364 if msg is None:
365 if value is None:
366 msg = '%s not in headers' % `key`
367 else:
368 msg = '%s:%s not in headers' % (`key`, `value`)
369 self._handlewebError(msg)
370
372 """Fail if key in self.headers."""
373 lowkey = key.lower()
374 matches = [k for k, v in self.headers if k.lower() == lowkey]
375 if matches:
376 if msg is None:
377 msg = '%s in headers' % `key`
378 self._handlewebError(msg)
379
380 - def assertBody(self, value, msg=None):
381 """Fail if value != self.body."""
382 if value != self.body:
383 if msg is None:
384 msg = 'expected body:\n%s\n\nactual body:\n%s' % (`value`, `self.body`)
385 self._handlewebError(msg)
386
387 - def assertInBody(self,