Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

root/trunk/cherrypy/process/wspbus.py

Revision 2037 (checked in by fumanchu, 1 week ago)

Bus.wait may now take a list or tuple for the 'state' arg, to wait on any of multiple states.

  • Property svn:eol-style set to native
Line 
1 """An implementation of the Web Site Process Bus.
2
3 This module is completely standalone, depending only on the stdlib.
4
5 Web Site Process Bus
6 --------------------
7
8 A Bus object is used to contain and manage site-wide behavior:
9 daemonization, HTTP server start/stop, process reload, signal handling,
10 drop privileges, PID file management, logging for all of these,
11 and many more.
12
13 In addition, a Bus object provides a place for each web framework
14 to register code that runs in response to site-wide events (like
15 process start and stop), or which controls or otherwise interacts with
16 the site-wide components mentioned above. For example, a framework which
17 uses file-based templates would add known template filenames to an
18 autoreload component.
19
20 Ideally, a Bus object will be flexible enough to be useful in a variety
21 of invocation scenarios:
22
23  1. The deployer starts a site from the command line via a framework-
24      neutral deployment script; applications from multiple frameworks
25      are mixed in a single site. Command-line arguments and configuration
26      files are used to define site-wide components such as the HTTP server,
27      WSGI component graph, autoreload behavior, signal handling, etc.
28  2. The deployer starts a site via some other process, such as Apache;
29      applications from multiple frameworks are mixed in a single site.
30      Autoreload and signal handling (from Python at least) are disabled.
31  3. The deployer starts a site via a framework-specific mechanism;
32      for example, when running tests, exploring tutorials, or deploying
33      single applications from a single framework. The framework controls
34      which site-wide components are enabled as it sees fit.
35
36 The Bus object in this package uses topic-based publish-subscribe
37 messaging to accomplish all this. A few topic channels are built in
38 ('start', 'stop', 'exit', and 'graceful'). Frameworks and site containers
39 are free to define their own. If a message is sent to a channel that has
40 not been defined or has no listeners, there is no effect.
41
42 In general, there should only ever be a single Bus object per process.
43 Frameworks and site containers share a single Bus object by publishing
44 messages and subscribing listeners.
45
46 The Bus object works as a finite state machine which models the current
47 state of the process. Bus methods move it from one state to another;
48 those methods then publish to subscribed listeners on the channel for
49 the new state.
50
51                         O
52                         |
53                         V
54        STOPPING --> STOPPED --> EXITING -> X
55           A   A         |
56           |    \___     |
57           |        \    |
58           |         V   V
59         STARTED <-- STARTING
60
61 """
62
63 import atexit
64 import os
65 try:
66     set
67 except NameError:
68     from sets import Set as set
69 import sys
70 import threading
71 import time
72 import traceback as _traceback
73 import warnings
74
75
76 # Use a flag to indicate the state of the bus.
77 class _StateEnum(object):
78     class State(object):
79         name = None
80         def __repr__(self):
81             return "states.%s" % self.name
82    
83     def __setattr__(self, key, value):
84         if isinstance(value, self.State):
85             value.name = key
86         object.__setattr__(self, key, value)
87 states = _StateEnum()
88 states.STOPPED = states.State()
89 states.STARTING = states.State()
90 states.STARTED = states.State()
91 states.STOPPING = states.State()
92 states.EXITING = states.State()
93
94
95 class Bus(object):
96     """Process state-machine and messenger for HTTP site deployment.
97     
98     All listeners for a given channel are guaranteed to be called even
99     if others at the same channel fail. Each failure is logged, but
100     execution proceeds on to the next listener. The only way to stop all
101     processing from inside a listener is to raise SystemExit and stop the
102     whole server.
103     """
104    
105     states = states
106     state = states.STOPPED
107     execv = False
108    
109     def __init__(self):
110         self.execv = False
111         self.state = states.STOPPED
112         self.listeners = dict(
113             [(channel, set()) for channel
114              in ('start', 'stop', 'exit', 'graceful', 'log')])
115         self._priorities = {}
116    
117     def subscribe(self, channel, callback, priority=None):
118         """Add the given callback at the given channel (if not present)."""
119         if channel not in self.listeners:
120             self.listeners[channel] = set()
121         self.listeners[channel].add(callback)
122        
123         if priority is None:
124             priority = getattr(callback, 'priority', 50)
125         self._priorities[(channel, callback)] = priority
126    
127     def unsubscribe(self, channel, callback):
128         """Discard the given callback (if present)."""
129         listeners = self.listeners.get(channel)
130         if listeners and callback in listeners:
131             listeners.discard(callback)
132             del self._priorities[(channel, callback)]
133    
134     def publish(self, channel, *args, **kwargs):
135         """Return output of all subscribers for the given channel."""
136         if channel not in self.listeners:
137             return []
138        
139         exc = None
140         output = []
141        
142         items = [(self._priorities[(channel, listener)], listener)
143                  for listener in self.listeners[channel]]
144         items.sort()
145         for priority, listener in items:
146             try:
147                 output.append(listener(*args, **kwargs))
148             except KeyboardInterrupt:
149                 raise
150             except SystemExit, e:
151                 # If we have previous errors ensure the exit code is non-zero
152                 if exc and e.code == 0:
153                     e.code = 1
154                 raise
155             except:
156                 exc = sys.exc_info()[1]
157                 if channel == 'log':
158                     # Assume any further messages to 'log' will fail.
159                     pass
160                 else:
161                     self.log("Error in %r listener %r" % (channel, listener),
162                              level=40, traceback=True)
163         if exc:
164             raise
165         return output
166    
167     def _clean_exit(self):
168         """An atexit handler which asserts the Bus is not running."""
169         if self.state != states.EXITING:
170             warnings.warn(
171                 "The main thread is exiting, but the Bus is in the %r state; "
172                 "shutting it down automatically now. You must either call "
173                 "bus.block() after start(), or call bus.exit() before the "
174                 "main thread exits." % self.state, RuntimeWarning)
175             self.exit()
176    
177     def start(self):
178         """Start all services."""
179         atexit.register(self._clean_exit)
180        
181         self.state = states.STARTING
182         self.log('Bus STARTING')
183         try:
184             self.publish('start')
185             self.state = states.STARTED
186             self.log('Bus STARTED')
187         except (KeyboardInterrupt, SystemExit):
188             raise
189         except:
190             self.log("Shutting down due to error in start listener:",
191                      level=40, traceback=True)
192             e_info = sys.exc_info()
193             try:
194                 self.exit()
195             except:
196                 # Any stop/exit errors will be logged inside publish().
197                 pass
198             raise e_info[0], e_info[1], e_info[2]
199    
200     def exit(self):
201         """Stop all services and prepare to exit the process."""
202         self.stop()
203        
204         self.state = states.EXITING
205         self.log('Bus EXITING')
206         self.publish('exit')
207         # This isn't strictly necessary, but it's better than seeing
208         # "Waiting for child threads to terminate..." and then nothing.
209         self.log('Bus EXITED')
210    
211     def restart(self):
212         """Restart the process (may close connections).
213         
214         This method does not restart the process from the calling thread;
215         instead, it stops the bus and asks the main thread to call execv.
216         """
217         self.execv = True
218         self.exit()
219    
220     def graceful(self):
221         """Advise all services to reload."""
222         self.log('Bus graceful')
223         self.publish('graceful')
224    
225     def block(self, interval=0.1):
226         """Wait for the EXITING state, KeyboardInterrupt or SystemExit."""
227         try:
228             self.wait(states.EXITING, interval=interval)
229         except (KeyboardInterrupt, IOError):
230             # The time.sleep call might raise
231             # "IOError: [Errno 4] Interrupted function call" on KBInt.
232             self.log('Keyboard Interrupt: shutting down bus')
233             self.exit()
234         except SystemExit:
235             self.log('SystemExit raised: shutting down bus')
236             self.exit()
237             raise
238        
239         # Waiting for ALL child threads to finish is necessary on OS X.
240         # See http://www.cherrypy.org/ticket/581.
241         # It's also good to let them all shut down before allowing
242         # the main thread to call atexit handlers.
243         # See http://www.cherrypy.org/ticket/751.
244         self.log("Waiting for child threads to terminate...")
245         for t in threading.enumerate():
246             if (t != threading.currentThread() and t.isAlive()
247                 # Note that any dummy (external) threads are always daemonic.
248                 and not t.isDaemon()):
249                 t.join()
250        
251         if self.execv:
252             self._do_execv()
253    
254     def wait(self, state, interval=0.1):
255         """Wait for the given state(s)."""
256         if isinstance(state, (tuple, list)):
257             states = state
258         else:
259             states = [state]
260        
261         def _wait():
262             while self.state not in states:
263                 time.sleep(interval)
264        
265         # From http://psyco.sourceforge.net/psycoguide/bugs.html:
266         # "The compiled machine code does not include the regular polling
267         # done by Python, meaning that a KeyboardInterrupt will not be
268         # detected before execution comes back to the regular Python
269         # interpreter. Your program cannot be interrupted if caught
270         # into an infinite Psyco-compiled loop."
271         try:
272             sys.modules['psyco'].cannotcompile(_wait)
273         except (KeyError, AttributeError):
274             pass
275        
276         _wait()
277    
278     def _do_execv(self):
279         """Re-execute the current process.
280         
281         This must be called from the main thread, because certain platforms
282         (OS X) don't allow execv to be called in a child thread very well.
283         """
284         args = sys.argv[:]
285         self.log('Re-spawning %s' % ' '.join(args))
286         args.insert(0, sys.executable)
287         if sys.platform == 'win32':
288             args = ['"%s"' % arg for arg in args]
289        
290         os.execv(sys.executable, args)
291    
292     def stop(self):
293         """Stop all services."""
294         self.state = states.STOPPING
295         self.log('Bus STOPPING')
296         self.publish('stop')
297         self.state = states.STOPPED
298         self.log('Bus STOPPED')
299    
300     def start_with_callback(self, func, args=None, kwargs=None):
301         """Start 'func' in a new thread T, then start self (and return T)."""
302         if args is None:
303             args = ()
304         if kwargs is None:
305             kwargs = {}
306         args = (func,) + args
307        
308         def _callback(func, *a, **kw):
309             self.wait(states.STARTED)
310             func(*a, **kw)
311         t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
312         t.setName('Bus Callback ' + t.getName())
313         t.start()
314        
315         self.start()
316        
317         return t
318    
319     def log(self, msg="", level=20, traceback=False):
320         """Log the given message. Append the last traceback if requested."""
321         if traceback:
322             exc = sys.exc_info()
323             msg += "\n" + "".join(_traceback.format_exception(*exc))
324         self.publish('log', msg, level)
325
326 bus = Bus()
Note: See TracBrowser for help on using the browser.

Hosted by WebFaction

Log in as guest/cpguest to create tickets