Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

root/branches/cp3-wsgi-remix/_cpwsgiserver.py

Revision 1244 (checked in by dowski, 2 years ago)

Created a branch for some WSGI related ideas that I have implemented.

  • Property svn:eol-style set to native
Line 
1 """A high-speed, production ready, thread pooled, generic WSGI server."""
2
3 import mimetools # todo: use email
4 import Queue
5 import re
6 quoted_slash = re.compile("(?i)%2F")
7 import rfc822
8 import socket
9 import sys
10 import threading
11 import time
12 import traceback
13 from urllib import unquote
14 from urlparse import urlparse
15
16 import errno
17 socket_errors_to_ignore = []
18 # Not all of these names will be defined for every platform.
19 for _ in ("EPIPE", "ETIMEDOUT", "ECONNREFUSED", "ECONNRESET",
20           "EHOSTDOWN", "EHOSTUNREACH",
21           "WSAECONNABORTED", "WSAECONNREFUSED", "WSAECONNRESET",
22           "WSAENETRESET", "WSAETIMEDOUT"):
23     if _ in dir(errno):
24         socket_errors_to_ignore.append(getattr(errno, _))
25 # de-dupe the list
26 socket_errors_to_ignore = dict.fromkeys(socket_errors_to_ignore).keys()
27
28 # These are lowercase because mimetools.Message uses lowercase keys.
29 comma_separated_headers = [
30     'accept', 'accept-charset', 'accept-encoding', 'accept-language',
31     'accept-ranges', 'allow', 'cache-control', 'connection', 'content-encoding',
32     'content-language', 'expect', 'if-match', 'if-none-match', 'pragma',
33     'proxy-authenticate', 'te', 'trailer', 'transfer-encoding', 'upgrade',
34     'vary', 'via', 'warning', 'www-authenticate',
35     ]
36
37 class HTTPRequest(object):
38    
39     stderr = sys.stderr
40     bufsize = -1
41    
42     def __init__(self, socket, addr, server):
43         self.socket = socket
44         self.addr = addr
45         self.server = server
46         self.environ = {}
47         self.ready = False
48         self.started_response = False
49         self.status = ""
50         self.outheaders = []
51         self.outheaderkeys = []
52         self.rfile = self.socket.makefile("r", self.bufsize)
53         self.wfile = self.socket.makefile("w", self.bufsize)
54         self.sent_headers = False
55    
56     def parse_request(self):
57         self.sent_headers = False
58         self.environ = {}
59         self.environ["wsgi.version"] = (1,0)
60         self.environ["wsgi.url_scheme"] = "http"
61         self.environ["wsgi.input"] = self.rfile
62         self.environ["wsgi.errors"] = self.stderr
63         self.environ["wsgi.multithread"] = True
64         self.environ["wsgi.multiprocess"] = False
65         self.environ["wsgi.run_once"] = False
66         request_line = self.rfile.readline()
67         if not request_line:
68             self.ready = False
69             return
70        
71         method, path, req_protocol = request_line.strip().split(" ", 2)
72         self.environ["REQUEST_METHOD"] = method
73        
74         # path may be an abs_path (including "http://host.domain.tld");
75         scheme, location, path, params, qs, frag = urlparse(path)
76         if scheme:
77             self.environ["wsgi.url_scheme"] = scheme
78         if params:
79             path = path + ";" + params
80        
81         # Unquote the path+params (e.g. "/this%20path" -> "this path").
82         # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
83         #
84         # But note that "...a URI must be separated into its components
85         # before the escaped characters within those components can be
86         # safely decoded." http://www.ietf.org/rfc/rfc2396.txt, sec 2.4.2
87         atoms = [unquote(x) for x in quoted_slash.split(path)]
88         path = "%2F".join(atoms)
89        
90         self.wsgi_app = self.server.app
91         self.environ['SCRIPT_NAME'] = ''
92         self.environ['PATH_INFO'] = path
93        
94         # Note that, like wsgiref and most other WSGI servers,
95         # we unquote the path but not the query string.
96         self.environ["QUERY_STRING"] = qs
97        
98         # Compare request and server HTTP protocol versions, in case our
99         # server does not support the requested protocol. Limit our output
100         # to min(req, server). We want the following output:
101         #     request    server     actual written   supported response
102         #     protocol   protocol  response protocol feature set (SERVER_PROTOCOL)
103         # a     1.0        1.0           1.0                1.0
104         # b     1.0        1.1           1.1                1.0
105         # c     1.1        1.0           1.0                1.0
106         # d     1.1        1.1           1.1                1.1
107         # Notice that, in (b), the response will be "HTTP/1.1" even though
108         # the client only understands 1.0. RFC 2616 10.5.6 says we should
109         # only return 505 if the _major_ version is different.
110         rp = int(req_protocol[5]), int(req_protocol[7])
111         sp = int(self.server.protocol[5]), int(self.server.protocol[7])
112         if sp[0] != rp[0]:
113             self.abort("505 HTTP Version Not Supported")
114             return
115         self.environ["SERVER_PROTOCOL"] = "HTTP/%s.%s" % min(rp, sp)
116        
117         # If the Request-URI was an absoluteURI, use its location atom.
118         self.environ["SERVER_NAME"] = location or self.server.server_name
119        
120         if isinstance(self.server.bind_addr, basestring):
121             # AF_UNIX. This isn't really allowed by WSGI, which doesn't
122             # address unix domain sockets. But it's better than nothing.
123             self.environ["SERVER_PORT"] = ""
124         else:
125             self.environ["SERVER_PORT"] = str(self.server.bind_addr[1])
126             # optional values
127             self.environ["REMOTE_HOST"] = self.addr[0]
128             self.environ["REMOTE_ADDR"] = self.addr[0]
129             self.environ["REMOTE_PORT"] = str(self.addr[1])
130        
131         # then all the http headers
132         headers = mimetools.Message(self.rfile)
133         self.environ["CONTENT_TYPE"] = headers.getheader("Content-type", "")
134         cl = headers.getheader("Content-length")
135         if method in ("POST", "PUT") and cl is None:
136             # No Content-Length header supplied. This will hang
137             # cgi.FieldStorage, since it cannot determine when to
138             # stop reading from the socket. Until we handle chunked
139             # encoding, always respond with 411 Length Required.
140             # See http://www.cherrypy.org/ticket/493.
141             self.abort("411 Length Required")
142             return
143         self.environ["CONTENT_LENGTH"] = cl or ""
144        
145         for k in headers:
146             envname = "HTTP_" + k.upper().replace("-", "_")
147             if k in comma_separated_headers:
148                 self.environ[envname] = ", ".join(headers.getheaders(k))
149             else:
150                 self.environ[envname] = headers[k]
151         self.ready = True
152    
153     def abort(self, status, msg=""):
154         """Write a simple error message back to the client."""
155         self.wfile.write("%s %s\r\n" % (self.server.protocol, status))
156         self.wfile.write("Content-Length: %s\r\n\r\n" % len(msg))
157         if msg:
158             self.wfile.write(msg)
159         self.wfile.flush()
160         self.ready = False
161    
162     def start_response(self, status, headers, exc_info = None):
163         if self.started_response:
164             if not exc_info:
165                 assert False, "Already started response"
166             else:
167                 try:
168                     raise exc_info[0], exc_info[1], exc_info[2]
169                 finally:
170                     exc_info = None
171         self.started_response = True
172         self.status = status
173         self.outheaders = headers
174         self.outheaderkeys = [key.lower() for (key,value) in self.outheaders]
175         return self.write
176    
177     def write(self, d):
178         if not self.sent_headers:
179             self.sent_headers = True
180             self.send_headers()
181         self.wfile.write(d)
182         self.wfile.flush()
183    
184     def send_headers(self):
185         if "content-length" not in self.outheaderkeys:
186             self.close_at_end = True
187         if "date" not in self.outheaderkeys:
188             self.outheaders.append(("Date", rfc822.formatdate()))
189         if "server" not in self.outheaderkeys:
190             self.outheaders.append(("Server", self.server.version))
191         if (self.server.protocol == "HTTP/1.1"
192             and "connection" not in self.outheaderkeys):
193             self.outheaders.append(("Connection", "close"))
194         self.wfile.write(self.server.protocol + " " + self.status + "\r\n")
195         for (k,v) in self.outheaders:
196             self.wfile.write(k + ": " + v + "\r\n")
197         self.wfile.write("\r\n")
198         self.wfile.flush()
199    
200     def terminate(self):
201         if self.ready and not self.sent_headers and not self.server.interrupt:
202             self.sent_headers = True
203             self.send_headers()
204         self.rfile.close()
205         self.wfile.close()
206         self.socket.close()
207
208
209 _SHUTDOWNREQUEST = None
210
211 class WorkerThread(threading.Thread):
212    
213     def __init__(self, server):
214         self.ready = False
215         self.server = server
216         threading.Thread.__init__(self)
217    
218     def run(self):
219         try:
220             self.ready = True
221             while True:
222                 request = self.server.requests.get()
223                 if request is _SHUTDOWNREQUEST:
224                     return
225                
226                 try:
227                     try:
228                         request.parse_request()
229                         if request.ready:
230                             response = request.wsgi_app(request.environ,
231                                                         request.start_response)
232                             for line in response:
233                                 request.write(line)
234                             if hasattr(response, "close"):
235                                 response.close()
236                     except socket.error, e:
237                         errno = e.args[0]
238                         if errno not in socket_errors_to_ignore:
239                             traceback.print_exc()
240                     except (KeyboardInterrupt, SystemExit), exc:
241                         self.server.interrupt = exc
242                     except:
243                         traceback.print_exc()
244                 finally:
245                     request.terminate()
246         except (KeyboardInterrupt, SystemExit), exc:
247             self.server.interrupt = exc
248
249
250 class CherryPyWSGIServer(object):
251     """An HTTP server for WSGI.
252     
253     bind_addr: a (host, port) tuple if TCP sockets are desired;
254         for UNIX sockets, supply the filename as a string.
255     wsgi_app: the WSGI 'application callable'; multiple WSGI applications
256         may be passed as (script_name, callable) pairs.
257     numthreads: the number of worker threads to create (default 10).
258     server_name: the string to set for WSGI's SERVER_NAME environ entry.
259         Defaults to socket.gethostname().
260     max: the maximum number of queued requests (defaults to -1 = no limit).
261     request_queue_size: the 'backlog' argument to socket.listen();
262         specifies the maximum number of queued connections (default 5).
263     timeout: the timeout in seconds for accepted connections (default 10).
264     """
265    
266     protocol = "HTTP/1.0"
267     version = "CherryPy/3.0.0alpha"
268     ready = False
269     _interrupt = None
270     RequestHandlerClass = HTTPRequest
271    
272     def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
273                  max=-1, request_queue_size=5, timeout=10):
274         self.requests = Queue.Queue(max)
275        
276         self.app = wsgi_app
277        
278         self.bind_addr = bind_addr
279         self.numthreads = numthreads or 1
280         if not server_name:
281             server_name = socket.gethostname()
282         self.server_name = server_name
283         self.request_queue_size = request_queue_size
284         self._workerThreads = []
285        
286         self.timeout = timeout
287    
288     def start(self):
289         """Run the server forever."""
290         # We don't have to trap KeyboardInterrupt or SystemExit here,
291         # because cherrpy.server already does so, calling self.stop() for us.
292         # If you're using this server with another framework, you should
293         # trap those exceptions in whatever code block calls start().
294         self._interrupt = None
295        
296         def bind(family, type, proto=0):
297             """Create (or recreate) the actual socket object."""
298             self.socket = socket.socket(family, type, proto)
299             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
300             self.socket.bind(self.bind_addr)
301        
302         # Select the appropriate socket
303         if isinstance(self.bind_addr, basestring):
304             # AF_UNIX socket
305            
306             # So we can reuse the socket...
307             try: os.unlink(self.bind_addr)
308             except: pass
309            
310             # So everyone can access the socket...
311             try: os.chmod(self.bind_addr, 0777)
312             except: pass
313            
314             info = [(socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_addr)]
315         else:
316             # AF_INET or AF_INET6 socket
317             # Get the correct address family for our host (allows IPv6 addresses)
318             host, port = self.bind_addr
319             try:
320                 info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
321                                           socket.SOCK_STREAM)
322             except socket.gaierror:
323                 # Probably a DNS issue. Assume IPv4.
324                 info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", self.bind_addr)]
325        
326         self.socket = None
327         msg = "No socket could be created"
328         for res in info:
329             af, socktype, proto, canonname, sa = res
330             try:
331                 bind(af, socktype, proto)
332             except socket.error, msg:
333                 if self.socket:
334                     self.socket.close()
335                 self.socket = None
336                 continue
337             break
338         if not self.socket:
339             raise socket.error, msg
340        
341         # Timeout so KeyboardInterrupt can be caught on Win32
342         self.socket.settimeout(1)
343         self.socket.listen(self.request_queue_size)
344        
345         # Create worker threads
346         for i in xrange(self.numthreads):
347             self._workerThreads.append(WorkerThread(self))
348         for worker in self._workerThreads:
349             worker.setName("CP WSGIServer " + worker.getName())
350             worker.start()
351         for worker in self._workerThreads:
352             while not worker.ready:
353                 time.sleep(.1)
354        
355         self.ready = True
356         while self.ready:
357             self.tick()
358             if self.interrupt:
359                 while self.interrupt is True:
360                     # Wait for self.stop() to complete
361                     time.sleep(0.1)
362                 raise self.interrupt
363    
364     def tick(self):
365         try:
366             s, addr = self.socket.accept()
367             if not self.ready:
368                 return
369             if hasattr(s, 'settimeout'):
370                 s.settimeout(self.timeout)
371             request = self.RequestHandlerClass(s, addr, self)
372             self.requests.put(request)
373         except socket.timeout:
374             # The only reason for the timeout in start() is so we can
375             # notice keyboard interrupts on Win32, which don't interrupt
376             # accept() by default
377             return
378         except socket.error, x:
379             if x.args[1] == "Bad file descriptor":
380                 # Our socket was closed
381                 return
382             raise
383    
384     def _get_interrupt(self):
385         return self._interrupt
386     def _set_interrupt(self, interrupt):
387         self._interrupt = True
388         self.stop()
389         self._interrupt = interrupt
390     interrupt = property(_get_interrupt, _set_interrupt)
391    
392     def stop(self):
393         """Gracefully shutdown a server that is serving forever."""
394         self.ready = False
395        
396         sock = getattr(self, "socket", None)
397         if sock:
398             if not isinstance(self.bind_addr, basestring):
399                 # Touch our own socket to make accept() return immediately.
400                 try:
401                     host, port = sock.getsockname()[:2]
402                 except socket.error, x:
403                     if x.args[1] != "Bad file descriptor":
404                         raise
405                 else:
406                     for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
407                                                   socket.SOCK_STREAM):
408                         af, socktype, proto, canonname, sa = res
409                         s = None
410                         try:
411                             s = socket.socket(af, socktype, proto)
412                             # See http://groups.google.com/group/cherrypy-users/
413                             #        browse_frm/thread/bbfe5eb39c904fe0
414                             s.settimeout(1.0)
415                             s.connect((host, port))
416                             s.close()
417                         except socket.error:
418                             if s:
419                                 s.close()
420             if hasattr(sock, "close"):
421                 sock.close()
422             self.socket = None
423        
424         # Must shut down threads here so the code that calls
425         # this method can know when all threads are stopped.
426         for worker in self._workerThreads:
427             self.requests.put(_SHUTDOWNREQUEST)
428        
429         # Don't join currentThread (when stop is called inside a request).
430         current = threading.currentThread()
431         while self._workerThreads:
432             worker = self._workerThreads.pop()
433             if worker is not current and worker.isAlive:
434                 try:
435                     worker.join()
436                 except AssertionError:
437                     pass
438