Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

Changeset 1734

Show
Ignore:
Timestamp:
10/02/07 11:51:35
Author:
fumanchu
Message:

Fix for #539 (Dynamic spare thread creation and destruction).

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/cherrypy/wsgiserver/__init__.py

    r1733 r1734  
    707707 
    708708 
     709class ThreadPool(object): 
     710    """A Request Queue for the CherryPyWSGIServer which pools threads.""" 
     711     
     712    def __init__(self, server, min=10, max=-1): 
     713        self.server = server 
     714        self.min = min 
     715        self.max = max 
     716        self._threads = [] 
     717        self._queue = Queue.Queue() 
     718        self.get = self._queue.get 
     719     
     720    def start(self): 
     721        """Start the pool of threads.""" 
     722        for i in xrange(self.min): 
     723            self._threads.append(WorkerThread(self.server)) 
     724        for worker in self._threads: 
     725            worker.setName("CP WSGIServer " + worker.getName()) 
     726            worker.start() 
     727        for worker in self._threads: 
     728            while not worker.ready: 
     729                time.sleep(.1) 
     730     
     731    def _get_idle(self): 
     732        """Number of worker threads which are idle. Read-only.""" 
     733        return len([t for t in self._threads if t.conn is None]) 
     734    idle = property(_get_idle, doc=_get_idle.__doc__) 
     735     
     736    def put(self, obj): 
     737        self._queue.put(obj) 
     738        if obj is _SHUTDOWNREQUEST: 
     739            return 
     740         
     741        # Grow/shrink the pool if necessary. 
     742        # Remove any dead threads from our list 
     743        for t in self._threads: 
     744            if not t.isAlive(): 
     745                self._threads.remove(t) 
     746     
     747    def grow(self, amount): 
     748        """Spawn new worker threads (not above self.max).""" 
     749        for i in xrange(amount): 
     750            if self.max > 0 and len(self._threads) >= self.max: 
     751                break 
     752            worker = WorkerThread(self.server) 
     753            worker.setName("CP WSGIServer " + worker.getName()) 
     754            self._threads.append(worker) 
     755            worker.start() 
     756     
     757    def shrink(self, amount): 
     758        """Kill off worker threads (not below self.min).""" 
     759        for i in xrange(min(amount, len(self._threads) - self.min)): 
     760            # Put a number of shutdown requests on the queue equal 
     761            # to 'amount'. Once each of those is processed by a worker, 
     762            # that worker will terminate and be culled from our list 
     763            # in self.put. 
     764            self._queue.put(_SHUTDOWNREQUEST) 
     765     
     766    def stop(self, timeout=5): 
     767        # Must shut down threads here so the code that calls 
     768        # this method can know when all threads are stopped. 
     769        for worker in self._threads: 
     770            self._queue.put(_SHUTDOWNREQUEST) 
     771         
     772        # Don't join currentThread (when stop is called inside a request). 
     773        current = threading.currentThread() 
     774        while self._threads: 
     775            worker = self._threads.pop() 
     776            if worker is not current and worker.isAlive(): 
     777                try: 
     778                    if timeout is None or timeout < 0: 
     779                        worker.join() 
     780                    else: 
     781                        worker.join(timeout) 
     782                        if worker.isAlive(): 
     783                            # We exhausted the timeout. 
     784                            # Forcibly shut down the socket. 
     785                            c = worker.conn 
     786                            if c and not c.rfile.closed: 
     787                                if SSL and isinstance(c.socket, SSL.ConnectionType): 
     788                                    # pyOpenSSL.socket.shutdown takes no args 
     789                                    c.socket.shutdown() 
     790                                else: 
     791                                    c.socket.shutdown(socket.SHUT_RD) 
     792                            worker.join() 
     793                except (AssertionError, 
     794                        # Ignore repeated Ctrl-C. 
     795                        # See http://www.cherrypy.org/ticket/691. 
     796                        KeyboardInterrupt), exc1: 
     797                    pass 
     798 
     799 
     800 
    709801class SSLConnection: 
    710802    """A thread-safe wrapper for an SSL.Connection. 
     
    734826 
    735827 
     828 
    736829class CherryPyWSGIServer(object): 
    737830    """An HTTP server for WSGI. 
     
    788881    def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None, 
    789882                 max=-1, request_queue_size=5, timeout=10, shutdown_timeout=5): 
    790         self.requests = Queue.Queue(max) 
     883        self.requests = ThreadPool(self, min=numthreads or 1, max=max) 
    791884         
    792885        if callable(wsgi_app): 
     
    801894         
    802895        self.bind_addr = bind_addr 
    803         self.numthreads = numthreads or 1 
    804896        if not server_name: 
    805897            server_name = socket.gethostname() 
    806898        self.server_name = server_name 
    807899        self.request_queue_size = request_queue_size 
    808         self._workerThreads = [] 
    809900         
    810901        self.timeout = timeout 
    811902        self.shutdown_timeout = shutdown_timeout 
     903     
     904    def _get_numthreads(self): 
     905        return self.requests.min 
     906    def _set_numthreads(self, value): 
     907        self.requests.min = value 
     908    numthreads = property(_get_numthreads, _set_numthreads) 
    812909     
    813910    def __str__(self): 
     
    897994         
    898995        # Create worker threads 
    899         for i in xrange(self.numthreads): 
    900             self._workerThreads.append(WorkerThread(self)) 
    901         for worker in self._workerThreads: 
    902             worker.setName("CP WSGIServer " + worker.getName()) 
    903             worker.start() 
    904         for worker in self._workerThreads: 
    905             while not worker.ready: 
    906                 time.sleep(.1) 
     996        self.requests.start() 
    907997         
    908998        self.ready = True 
     
    10261116            self.socket = None 
    10271117         
    1028         # Must shut down threads here so the code that calls 
    1029         # this method can know when all threads are stopped. 
    1030         for worker in self._workerThreads: 
    1031             self.requests.put(_SHUTDOWNREQUEST) 
    1032          
    1033         # Don't join currentThread (when stop is called inside a request). 
    1034         current = threading.currentThread() 
    1035         timeout = self.shutdown_timeout 
    1036         while self._workerThreads: 
    1037             worker = self._workerThreads.pop() 
    1038             if worker is not current and worker.isAlive(): 
    1039                 try: 
    1040                     if timeout is None or timeout < 0: 
    1041                         worker.join() 
    1042                     else: 
    1043                         worker.join(timeout) 
    1044                         if worker.isAlive(): 
    1045                             # We exhausted the timeout. 
    1046                             # Forcibly shut down the socket. 
    1047                             c = worker.conn 
    1048                             if c and not c.rfile.closed: 
    1049                                 if SSL and isinstance(c.socket, SSL.ConnectionType): 
    1050                                     # pyOpenSSL.socket.shutdown takes no args 
    1051                                     c.socket.shutdown() 
    1052                                 else: 
    1053                                     c.socket.shutdown(socket.SHUT_RD) 
    1054                             worker.join() 
    1055                 except (AssertionError, 
    1056                         # Ignore repeated Ctrl-C. 
    1057                         # See http://www.cherrypy.org/ticket/691. 
    1058                         KeyboardInterrupt), exc1: 
    1059                     pass 
     1118        self.requests.stop(self.shutdown_timeout) 
    10601119     
    10611120    def populate_ssl_environ(self): 

Hosted by WebFaction

Log in as guest/cpguest to create tickets