Changeset 1502
- Timestamp:
- 12/09/06 17:50:35
- Files:
-
- branches/cherrypy-2.x/cherrypy/test/webtest.py (modified) (12 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/cherrypy-2.x/cherrypy/test/webtest.py
r1123 r1502 13 13 When an error occurs in the framework, call server_error. It will print 14 14 the traceback to stdout, and keep any assertions you have from running 15 (the assumption is that, if the server errors, the page output w on't be16 of further significance to your tests).15 (the assumption is that, if the server errors, the page output will not 16 be of further significance to your tests). 17 17 """ 18 18 … … 115 115 return test 116 116 else: 117 raise ValueError("do n't know how to make test from: %s" % obj)117 raise ValueError("do not know how to make test from: %s" % obj) 118 118 119 119 … … 140 140 HOST = "127.0.0.1" 141 141 PORT = 8000 142 HTTP_CONN=httplib.HTTPConnection 143 144 def getPage(self, url, headers=None, method="GET", body=None, protocol="HTTP/1.1"): 142 HTTP_CONN = httplib.HTTPConnection 143 PROTOCOL = "HTTP/1.1" 144 145 def set_persistent(self, on=True, auto_open=False): 146 """Make our HTTP_CONN persistent (or not). 147 148 If the 'on' argument is True (the default), then self.HTTP_CONN 149 will be set to an instance of httplib.HTTPConnection (or HTTPS 150 if self.scheme is "https"). This will then persist across requests. 151 152 We only allow for a single open connection, so if you call this 153 and we currently have an open connection, it will be closed. 154 """ 155 try: 156 self.HTTP_CONN.close() 157 except (TypeError, AttributeError): 158 pass 159 160 if self.scheme == "https": 161 cls = httplib.HTTPSConnection 162 else: 163 cls = httplib.HTTPConnection 164 165 if on: 166 self.HTTP_CONN = cls(self.HOST, self.PORT) 167 # Automatically re-connect? 168 self.HTTP_CONN.auto_open = auto_open 169 self.HTTP_CONN.connect() 170 else: 171 self.HTTP_CONN = cls 172 173 def _get_persistent(self): 174 return hasattr(self.HTTP_CONN, "__class__") 175 def _set_persistent(self, on=True): 176 self.set_persistent(on) 177 persistent = property(_get_persistent, _set_persistent) 178 179 def getPage(self, url, headers=None, method="GET", body=None, protocol=None): 145 180 """Open the url with debugging support. Return status, headers, body.""" 146 181 ServerError.on = False … … 148 183 self.url = url 149 184 result = openURL(url, headers, method, body, self.HOST, self.PORT, 150 self.HTTP_CONN, protocol )185 self.HTTP_CONN, protocol or self.PROTOCOL) 151 186 self.status, self.headers, self.body = result 152 187 … … 205 240 sys.exit() 206 241 207 def __call__(self, result=None):208 if result is None:209 result = self.defaultTestResult()210 result.startTest(self)211 testMethod = getattr(self, self._TestCase__testMethodName)212 try:242 if sys.version_info >= (2, 5): 243 def __call__(self, result=None): 244 if result is None: 245 result = self.defaultTestResult() 246 result.startTest(self) 247 testMethod = getattr(self, self._testMethodName) 213 248 try: 214 self.setUp() 215 except (KeyboardInterrupt, SystemExit): 216 raise 217 except: 218 result.addError(self, self._TestCase__exc_info()) 219 return 220 221 ok = 0 249 try: 250 self.setUp() 251 except (KeyboardInterrupt, SystemExit): 252 raise 253 except: 254 result.addError(self, self._exc_info()) 255 return 256 257 ok = 0 258 try: 259 testMethod() 260 ok = 1 261 except self.failureException: 262 result.addFailure(self, self._exc_info()) 263 except (KeyboardInterrupt, SystemExit): 264 raise 265 except: 266 result.addError(self, self._exc_info()) 267 268 try: 269 self.tearDown() 270 except (KeyboardInterrupt, SystemExit): 271 raise 272 except: 273 result.addError(self, self._exc_info()) 274 ok = 0 275 if ok: 276 result.addSuccess(self) 277 finally: 278 result.stopTest(self) 279 else: 280 def __call__(self, result=None): 281 if result is None: 282 result = self.defaultTestResult() 283 result.startTest(self) 284 testMethod = getattr(self, self._TestCase__testMethodName) 222 285 try: 223 testMethod() 224 ok = 1 225 except self.failureException: 226 result.addFailure(self, self._TestCase__exc_info()) 227 except (KeyboardInterrupt, SystemExit): 228 raise 229 except: 230 result.addError(self, self._TestCase__exc_info()) 231 232 try: 233 self.tearDown() 234 except (KeyboardInterrupt, SystemExit): 235 raise 236 except: 237 result.addError(self, self._TestCase__exc_info()) 286 try: 287 self.setUp() 288 except (KeyboardInterrupt, SystemExit): 289 raise 290 except: 291 result.addError(self, self._TestCase__exc_info()) 292 return 293 238 294 ok = 0 239 if ok: 240 result.addSuccess(self) 241 finally: 242 result.stopTest(self) 295 try: 296 testMethod() 297 ok = 1 298 except self.failureException: 299 result.addFailure(self, self._TestCase__exc_info()) 300 except (KeyboardInterrupt, SystemExit): 301 raise 302 except: 303 result.addError(self, self._TestCase__exc_info()) 304 305 try: 306 self.tearDown() 307 except (KeyboardInterrupt, SystemExit): 308 raise 309 except: 310 result.addError(self, self._TestCase__exc_info()) 311 ok = 0 312 if ok: 313 result.addSuccess(self) 314 finally: 315 result.stopTest(self) 243 316 244 317 def assertStatus(self, status, msg=None): … … 277 350 if k.lower() == lowkey: 278 351 if value is None or str(value) == v: 279 return 352 return v 280 353 281 354 if msg is None: … … 324 397 325 398 399 methods_with_bodies = ("POST", "PUT") 326 400 327 401 def cleanHeaders(headers, method, body, host, port): … … 338 412 break 339 413 if not found: 340 headers.append(("Host", "%s:%s" % (host, port))) 341 342 if method in ("POST", "PUT"): 414 if port == 80: 415 headers.append(("Host", host)) 416 else: 417 headers.append(("Host", "%s:%s" % (host, port))) 418 419 if method in methods_with_bodies: 343 420 # Stick in default type and length headers if not present 344 421 found = False … … 354 431 355 432 433 def shb(response): 434 """Return status, headers, body the way we like from a response.""" 435 h = [] 436 key, value = None, None 437 for line in response.msg.headers: 438 if line: 439 if line[0] in " \t": 440 value += line.strip() 441 else: 442 if key and value: 443 h.append((key, value)) 444 key, value = line.split(":", 1) 445 key = key.strip() 446 value = value.strip() 447 if key and value: 448 h.append((key, value)) 449 450 return "%s %s" % (response.status, response.reason), h, response.read() 451 452 356 453 def openURL(url, headers=None, method="GET", body=None, 357 454 host="127.0.0.1", port=8000, http_conn=httplib.HTTPConnection, … … 366 463 while trial < 10: 367 464 try: 368 conn = http_conn(host, port) 465 # Allow http_conn to be a class or an instance 466 if hasattr(http_conn, "host"): 467 conn = http_conn 468 else: 469 conn = http_conn(host, port) 470 369 471 conn._http_vsn_str = protocol 370 472 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) … … 372 474 # skip_accept_encoding argument added in python version 2.4 373 475 if sys.version_info < (2, 4): 476 def putheader(self, header, value): 477 if header == 'Accept-Encoding' and value == 'identity': 478 return 479 self.__class__.putheader(self, header, value) 480 import new 481 conn.putheader = new.instancemethod(putheader, conn, conn.__class__) 374 482 conn.putrequest(method.upper(), url, skip_host=True) 375 483 else: … … 387 495 response = conn.getresponse() 388 496 389 status = "%s %s" % (response.status, response.reason) 390 391 outheaders = [] 392 for line in response.msg.headers: 393 key, value = line.split(":", 1) 394 outheaders.append((key.strip(), value.strip())) 395 396 outbody = response.read() 397 398 conn.close() 399 return status, outheaders, outbody 497 s, h, b = shb(response) 498 499 if not hasattr(http_conn, "host"): 500 # We made our own conn instance. Close it. 501 conn.close() 502 503 return s, h, b 400 504 except socket.error: 401 505 trial += 1

