Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

root/branches/cherrypy-2.x/cherrypy/_cpwsgiserver.py

Revision 2660 (checked in by fs, 5 months ago)

bump the version number to 2.3.1 to make sure this won't be forgotten

  • Property svn:eol-style set to native
Line 
1 """A high-speed, production ready, thread pooled, generic WSGI server."""
2
3 import mimetools # todo: use email
4 import Queue
5 import re
6 quoted_slash = re.compile("(?i)%2F")
7 import rfc822
8 import socket
9 import sys
10 import threading
11 import time
12 import traceback
13 from urllib import unquote
14 from urlparse import urlparse
15
16 import errno
17 socket_errors_to_ignore = []
18 # Not all of these names will be defined for every platform.
19 for _ in ("EPIPE", "ETIMEDOUT", "ECONNREFUSED", "ECONNRESET",
20           "EHOSTDOWN", "EHOSTUNREACH",
21           "WSAECONNABORTED", "WSAECONNREFUSED", "WSAECONNRESET",
22           "WSAENETRESET", "WSAETIMEDOUT"):
23     if _ in dir(errno):
24         socket_errors_to_ignore.append(getattr(errno, _))
25 # de-dupe the list
26 socket_errors_to_ignore = dict.fromkeys(socket_errors_to_ignore).keys()
27
28 # These are lowercase because mimetools.Message uses lowercase keys.
29 comma_separated_headers = [
30     'accept', 'accept-charset', 'accept-encoding', 'accept-language',
31     'accept-ranges', 'allow', 'cache-control', 'connection', 'content-encoding',
32     'content-language', 'expect', 'if-match', 'if-none-match', 'pragma',
33     'proxy-authenticate', 'te', 'trailer', 'transfer-encoding', 'upgrade',
34     'vary', 'via', 'warning', 'www-authenticate',
35     ]
36
37 class HTTPRequest(object):
38    
39     stderr = sys.stderr
40     bufsize = -1
41    
42     def __init__(self, socket, addr, server):
43         self.socket = socket
44         self.addr = addr
45         self.server = server
46         self.environ = {}
47         self.ready = False
48         self.started_response = False
49         self.status = ""
50         self.outheaders = []
51         self.outheaderkeys = []
52         self.rfile = self.socket.makefile("r", self.bufsize)
53         self.wfile = self.socket.makefile("w", self.bufsize)
54         self.sent_headers = False
55    
56     def parse_request(self):
57         self.sent_headers = False
58         self.environ = {}
59         self.environ["wsgi.version"] = (1,0)
60         self.environ["wsgi.url_scheme"] = "http"
61         self.environ["wsgi.input"] = self.rfile
62         self.environ["wsgi.errors"] = self.stderr
63         self.environ["wsgi.multithread"] = True
64         self.environ["wsgi.multiprocess"] = False
65         self.environ["wsgi.run_once"] = False
66         request_line = self.rfile.readline()
67         if not request_line:
68             self.ready = False
69             return
70        
71         if request_line == "\r\n":
72             # RFC 2616 sec 4.1: "...if the server is reading the protocol
73             # stream at the beginning of a message and receives a CRLF
74             # first, it should ignore the CRLF."
75             # But only ignore one leading line! else we enable a DoS.
76             request_line = self.rfile.readline()
77             if not request_line:
78                 self.ready = False
79                 return
80        
81         method, path, req_protocol = request_line.strip().split(" ", 2)
82         self.environ["REQUEST_METHOD"] = method
83        
84         # path may be an abs_path (including "http://host.domain.tld");
85         scheme, location, path, params, qs, frag = urlparse(path)
86         if scheme:
87             self.environ["wsgi.url_scheme"] = scheme
88         if params:
89             path = path + ";" + params
90        
91         # Unquote the path+params (e.g. "/this%20path" -> "this path").
92         # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
93         #
94         # But note that "...a URI must be separated into its components
95         # before the escaped characters within those components can be
96         # safely decoded." http://www.ietf.org/rfc/rfc2396.txt, sec 2.4.2
97         atoms = [unquote(x) for x in quoted_slash.split(path)]
98         path = "%2F".join(atoms)
99        
100         for mount_point, wsgi_app in self.server.mount_points:
101             if path == "*":
102                 # This means, of course, that the first wsgi_app will
103                 # always handle a URI of "*".
104                 self.environ["SCRIPT_NAME"] = ""
105                 self.environ["PATH_INFO"] = "*"
106                 self.wsgi_app = wsgi_app
107                 break
108             # The mount_points list should be sorted by length, descending.
109             if path.startswith(mount_point):
110                 self.environ["SCRIPT_NAME"] = mount_point
111                 self.environ["PATH_INFO"] = path[len(mount_point):]
112                 self.wsgi_app = wsgi_app
113                 break
114         else:
115             self.abort("404 Not Found")
116             return
117        
118         # Note that, like wsgiref and most other WSGI servers,
119         # we unquote the path but not the query string.
120         self.environ["QUERY_STRING"] = qs
121         self.environ["SERVER_PROTOCOL"] = req_protocol
122         # If the Request-URI was an absoluteURI, use its location atom.
123         self.environ["SERVER_NAME"] = location or self.server.server_name
124        
125         if isinstance(self.server.bind_addr, basestring):
126             # AF_UNIX. This isn't really allowed by WSGI, which doesn't
127             # address unix domain sockets. But it's better than nothing.
128             self.environ["SERVER_PORT"] = ""
129         else:
130             self.environ["SERVER_PORT"] = str(self.server.bind_addr[1])
131             # optional values
132             # Until we do DNS lookups, don't include REMOTE_HOST
133             self.environ["REMOTE_ADDR"] = self.addr[0]
134             self.environ["REMOTE_PORT"] = str(self.addr[1])
135        
136         # then all the http headers
137         headers = mimetools.Message(self.rfile)
138         self.environ["CONTENT_TYPE"] = headers.getheader("Content-type", "")
139         cl = headers.getheader("Content-length")
140         if method in ("POST", "PUT") and cl is None:
141             # No Content-Length header supplied. This will hang
142             # cgi.FieldStorage, since it cannot determine when to
143             # stop reading from the socket. Until we handle chunked
144             # encoding, always respond with 411 Length Required.
145             # See http://www.cherrypy.org/ticket/493.
146             self.abort("411 Length Required")
147             return
148         self.environ["CONTENT_LENGTH"] = cl or ""
149        
150         for k in headers.keys():
151             envname = "HTTP_" + k.upper().replace("-", "_")
152             if k in comma_separated_headers:
153                 self.environ[envname] = ", ".join(headers.getheaders(k))
154             else:
155                 self.environ[envname] = headers[k]
156         self.ready = True
157    
158     def abort(self, status, msg=""):
159         """Write a simple error message back to the client."""
160         proto = self.environ.get("SERVER_PROTOCOL", "HTTP/1.0")
161         self.wfile.write("%s %s\r\n" % (proto, status))
162         self.wfile.write("Content-Length: %s\r\n\r\n" % len(msg))
163         if msg:
164             self.wfile.write(msg)
165         self.wfile.flush()
166         self.ready = False
167    
168     def start_response(self, status, headers, exc_info = None):
169         if self.started_response:
170             if not exc_info:
171                 assert False, "Already started response"
172             else:
173                 try:
174                     raise exc_info[0], exc_info[1], exc_info[2]
175                 finally:
176                     exc_info = None
177         self.started_response = True
178         self.status = status
179         self.outheaders = headers
180         self.outheaderkeys = [key.lower() for (key,value) in self.outheaders]
181         return self.write
182    
183     def write(self, d):
184         if not self.sent_headers:
185             self.sent_headers = True
186             self.send_headers()
187         self.wfile.write(d)
188         self.wfile.flush()
189    
190     def send_headers(self):
191         if "content-length" not in self.outheaderkeys:
192             self.close_at_end = True
193         if "date" not in self.outheaderkeys:
194             self.outheaders.append(("Date", rfc822.formatdate()))
195         if "server" not in self.outheaderkeys:
196             self.outheaders.append(("Server", self.server.version))
197         if (self.environ["SERVER_PROTOCOL"] == "HTTP/1.1"
198             and "connection" not in self.outheaderkeys):
199             self.outheaders.append(("Connection", "close"))
200         self.wfile.write(self.environ["SERVER_PROTOCOL"] + " " + self.status + "\r\n")
201         for (k,v) in self.outheaders:
202             self.wfile.write(k + ": " + v + "\r\n")
203         self.wfile.write("\r\n")
204         self.wfile.flush()
205    
206     def terminate(self):
207         if self.ready and not self.sent_headers and not self.server.interrupt:
208             self.sent_headers = True
209             self.send_headers()
210         self.rfile.close()
211         self.wfile.close()
212         self.socket.close()
213
214
215 _SHUTDOWNREQUEST = None
216
217 class WorkerThread(threading.Thread):
218    
219     def __init__(self, server):
220         self.ready = False
221         self.server = server
222         threading.Thread.__init__(self)
223    
224     def run(self):
225         try:
226             self.ready = True
227             while True:
228                 request = self.server.requests.get()
229                 if request is _SHUTDOWNREQUEST:
230                     return
231                
232                 try:
233                     try:
234                         request.parse_request()
235                         if request.ready:
236                             response = request.wsgi_app(request.environ,
237                                                         request.start_response)
238                             try:
239                                 for line in response:
240                                     request.write(line)
241                             finally:
242                                 if hasattr(response, "close"):
243                                     response.close()
244                     except socket.error, e:
245                         errno = e.args[0]
246                         if errno not in socket_errors_to_ignore:
247                             traceback.print_exc()
248                     except (KeyboardInterrupt, SystemExit), exc:
249                         self.server.interrupt = exc
250                     except:
251                         traceback.print_exc()
252                 finally:
253                     request.terminate()
254         except (KeyboardInterrupt, SystemExit), exc:
255             self.server.interrupt = exc
256
257
258 class CherryPyWSGIServer(object):
259     """An HTTP server for WSGI.
260     
261     bind_addr: a (host, port) tuple if TCP sockets are desired;
262         for UNIX sockets, supply the filename as a string.
263     wsgi_app: the WSGI 'application callable'; multiple WSGI applications
264         may be passed as (script_name, callable) pairs.
265     numthreads: the number of worker threads to create (default 10).
266     server_name: the string to set for WSGI's SERVER_NAME environ entry.
267         Defaults to socket.gethostname().
268     max: the maximum number of queued requests (defaults to -1 = no limit).
269     request_queue_size: the 'backlog' argument to socket.listen();
270         specifies the maximum number of queued connections (default 5).
271     timeout: the timeout in seconds for accepted connections (default 10).
272     """
273    
274     version = "CherryPy/2.3.1"
275     protocol = "HTTP/1.0"
276     ready = False
277     interrupt = None
278     RequestHandlerClass = HTTPRequest
279    
280     def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
281                  max=-1, request_queue_size=5, timeout=10):
282         self.requests = Queue.Queue(max)
283        
284         if callable(wsgi_app):
285             # We've been handed a single wsgi_app, in CP-2.1 style.
286             # Assume it's mounted at "".
287             self.mount_points = [("", wsgi_app)]
288         else:
289             # We've been handed a list of (mount_point, wsgi_app) tuples,
290             # so that the server can call different wsgi_apps, and also
291             # correctly set SCRIPT_NAME.
292             self.mount_points = wsgi_app
293         self.mount_points.sort()
294         self.mount_points.reverse()
295        
296         self.bind_addr = bind_addr
297         self.numthreads = numthreads or 1
298         if not server_name:
299             server_name = socket.gethostname()
300         self.server_name = server_name
301         self.request_queue_size = request_queue_size
302         self._workerThreads = []
303        
304         self.timeout = timeout
305    
306     def start(self):
307         """Run the server forever."""
308         # We don't have to trap KeyboardInterrupt or SystemExit here,
309         # because cherrpy.server already does so, calling self.stop() for us.
310         # If you're using this server with another framework, you should
311         # trap those exceptions in whatever code block calls start().
312        
313         def bind(family, type, proto=0):
314             """Create (or recreate) the actual socket object."""
315             self.socket = socket.socket(family, type, proto)
316             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
317             self.socket.bind(self.bind_addr)
318        
319         # Select the appropriate socket
320         if isinstance(self.bind_addr, basestring):
321             # AF_UNIX socket
322            
323             # So we can reuse the socket...
324             try: os.unlink(self.bind_addr)
325             except: pass
326            
327             # So everyone can access the socket...
328             try: os.chmod(self.bind_addr, 0777)
329             except: pass
330            
331             self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
332             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
333             self.socket.bind(self.bind_addr)
334         else:
335             # AF_INET or AF_INET6 socket
336             # Get the correct address family for our host (allows IPv6 addresses)
337             host, port = self.bind_addr
338             flags = 0
339             if host == '':
340                 # Despite the socket module docs, using '' does not
341                 # allow AI_PASSIVE to work. Passing None instead
342                 # returns '0.0.0.0' like we want.
343                 host = None
344                 flags = socket.AI_PASSIVE
345             try:
346                 info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
347                                           socket.SOCK_STREAM, 0, flags)
348             except socket.gaierror:
349                 # Probably a DNS issue. Assume IPv4.
350                 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
351                 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
352                 self.socket.bind(self.bind_addr)
353             else:
354                 self.socket = None
355                 msg = "No socket could be created"
356                 for res in info:
357                     af, socktype, proto, canonname, sa = res
358                     try:
359                         self.socket = socket.socket(af, socktype, proto)
360                         self.socket.setsockopt(socket.SOL_SOCKET,
361                                                socket.SO_REUSEADDR, 1)
362                         self.socket.bind(self.bind_addr)
363                     except socket.error, msg:
364                         if self.socket:
365                             self.socket.close()
366                         self.socket = None
367                         continue
368                     break
369                 if not self.socket:
370                     raise socket.error, msg
371        
372         # Timeout so KeyboardInterrupt can be caught on Win32
373         self.socket.settimeout(1)
374         self.socket.listen(self.request_queue_size)
375        
376         # Create worker threads
377         for i in xrange(self.numthreads):
378             self._workerThreads.append(WorkerThread(self))
379         for worker in self._workerThreads:
380             worker.setName("CP WSGIServer " + worker.getName())
381             worker.start()
382         for worker in self._workerThreads:
383             while not worker.ready:
384                 time.sleep(.1)
385        
386         self.ready = True
387         while self.ready:
388             self.tick()
389             if self.interrupt:
390                 raise self.interrupt
391    
392     def tick(self):
393         try:
394             s, addr = self.socket.accept()
395             if not self.ready:
396                 return
397             if hasattr(s, 'settimeout'):
398                 s.settimeout(self.timeout)
399             request = self.RequestHandlerClass(s, addr, self)
400             self.requests.put(request)
401         except socket.timeout:
402             # The only reason for the timeout in start() is so we can
403             # notice keyboard interrupts on Win32, which don't interrupt
404             # accept() by default
405             return
406         except socket.error, x:
407             msg = x.args[1]
408             if msg == "Bad file descriptor":
409                 # Our socket was closed
410                 return
411             if msg == "Resource temporarily unavailable":
412                 # Just try again. See http://www.cherrypy.org/ticket/479.
413                 return
414             raise
415    
416     def stop(self):
417         """Gracefully shutdown a server that is serving forever."""
418         self.ready = False
419         s = getattr(self, "socket", None)
420         if s and hasattr(s, "close"):
421             s.close()
422        
423         # Must shut down threads here so the code that calls
424         # this method can know when all threads are stopped.
425         for worker in self._workerThreads:
426             self.requests.put(_SHUTDOWNREQUEST)
427        
428         # Don't join currentThread (when stop is called inside a request).
429         current = threading.currentThread()
430         while self._workerThreads:
431             worker = self._workerThreads.pop()
432             if worker is not current and worker.isAlive:
433                 try:
434                     worker.join()
435                 except AssertionError:
436                     pass
437
438
Note: See TracBrowser for help on using the browser.

Hosted by WebFaction

Log in as guest/cpguest to create tickets