| | 709 | class ThreadPool(object): |
|---|
| | 710 | """A Request Queue for the CherryPyWSGIServer which pools threads.""" |
|---|
| | 711 | |
|---|
| | 712 | def __init__(self, server, min=10, max=-1): |
|---|
| | 713 | self.server = server |
|---|
| | 714 | self.min = min |
|---|
| | 715 | self.max = max |
|---|
| | 716 | self._threads = [] |
|---|
| | 717 | self._queue = Queue.Queue() |
|---|
| | 718 | self.get = self._queue.get |
|---|
| | 719 | |
|---|
| | 720 | def start(self): |
|---|
| | 721 | """Start the pool of threads.""" |
|---|
| | 722 | for i in xrange(self.min): |
|---|
| | 723 | self._threads.append(WorkerThread(self.server)) |
|---|
| | 724 | for worker in self._threads: |
|---|
| | 725 | worker.setName("CP WSGIServer " + worker.getName()) |
|---|
| | 726 | worker.start() |
|---|
| | 727 | for worker in self._threads: |
|---|
| | 728 | while not worker.ready: |
|---|
| | 729 | time.sleep(.1) |
|---|
| | 730 | |
|---|
| | 731 | def _get_idle(self): |
|---|
| | 732 | """Number of worker threads which are idle. Read-only.""" |
|---|
| | 733 | return len([t for t in self._threads if t.conn is None]) |
|---|
| | 734 | idle = property(_get_idle, doc=_get_idle.__doc__) |
|---|
| | 735 | |
|---|
| | 736 | def put(self, obj): |
|---|
| | 737 | self._queue.put(obj) |
|---|
| | 738 | if obj is _SHUTDOWNREQUEST: |
|---|
| | 739 | return |
|---|
| | 740 | |
|---|
| | 741 | # Grow/shrink the pool if necessary. |
|---|
| | 742 | # Remove any dead threads from our list |
|---|
| | 743 | for t in self._threads: |
|---|
| | 744 | if not t.isAlive(): |
|---|
| | 745 | self._threads.remove(t) |
|---|
| | 746 | |
|---|
| | 747 | def grow(self, amount): |
|---|
| | 748 | """Spawn new worker threads (not above self.max).""" |
|---|
| | 749 | for i in xrange(amount): |
|---|
| | 750 | if self.max > 0 and len(self._threads) >= self.max: |
|---|
| | 751 | break |
|---|
| | 752 | worker = WorkerThread(self.server) |
|---|
| | 753 | worker.setName("CP WSGIServer " + worker.getName()) |
|---|
| | 754 | self._threads.append(worker) |
|---|
| | 755 | worker.start() |
|---|
| | 756 | |
|---|
| | 757 | def shrink(self, amount): |
|---|
| | 758 | """Kill off worker threads (not below self.min).""" |
|---|
| | 759 | for i in xrange(min(amount, len(self._threads) - self.min)): |
|---|
| | 760 | # Put a number of shutdown requests on the queue equal |
|---|
| | 761 | # to 'amount'. Once each of those is processed by a worker, |
|---|
| | 762 | # that worker will terminate and be culled from our list |
|---|
| | 763 | # in self.put. |
|---|
| | 764 | self._queue.put(_SHUTDOWNREQUEST) |
|---|
| | 765 | |
|---|
| | 766 | def stop(self, timeout=5): |
|---|
| | 767 | # Must shut down threads here so the code that calls |
|---|
| | 768 | # this method can know when all threads are stopped. |
|---|
| | 769 | for worker in self._threads: |
|---|
| | 770 | self._queue.put(_SHUTDOWNREQUEST) |
|---|
| | 771 | |
|---|
| | 772 | # Don't join currentThread (when stop is called inside a request). |
|---|
| | 773 | current = threading.currentThread() |
|---|
| | 774 | while self._threads: |
|---|
| | 775 | worker = self._threads.pop() |
|---|
| | 776 | if worker is not current and worker.isAlive(): |
|---|
| | 777 | try: |
|---|
| | 778 | if timeout is None or timeout < 0: |
|---|
| | 779 | worker.join() |
|---|
| | 780 | else: |
|---|
| | 781 | worker.join(timeout) |
|---|
| | 782 | if worker.isAlive(): |
|---|
| | 783 | # We exhausted the timeout. |
|---|
| | 784 | # Forcibly shut down the socket. |
|---|
| | 785 | c = worker.conn |
|---|
| | 786 | if c and not c.rfile.closed: |
|---|
| | 787 | if SSL and isinstance(c.socket, SSL.ConnectionType): |
|---|
| | 788 | # pyOpenSSL.socket.shutdown takes no args |
|---|
| | 789 | c.socket.shutdown() |
|---|
| | 790 | else: |
|---|
| | 791 | c.socket.shutdown(socket.SHUT_RD) |
|---|
| | 792 | worker.join() |
|---|
| | 793 | except (AssertionError, |
|---|
| | 794 | # Ignore repeated Ctrl-C. |
|---|
| | 795 | # See http://www.cherrypy.org/ticket/691. |
|---|
| | 796 | KeyboardInterrupt), exc1: |
|---|
| | 797 | pass |
|---|
| | 798 | |
|---|
| | 799 | |
|---|
| | 800 | |
|---|
| 1028 | | # Must shut down threads here so the code that calls |
|---|
| 1029 | | # this method can know when all threads are stopped. |
|---|
| 1030 | | for worker in self._workerThreads: |
|---|
| 1031 | | self.requests.put(_SHUTDOWNREQUEST) |
|---|
| 1032 | | |
|---|
| 1033 | | # Don't join currentThread (when stop is called inside a request). |
|---|
| 1034 | | current = threading.currentThread() |
|---|
| 1035 | | timeout = self.shutdown_timeout |
|---|
| 1036 | | while self._workerThreads: |
|---|
| 1037 | | worker = self._workerThreads.pop() |
|---|
| 1038 | | if worker is not current and worker.isAlive(): |
|---|
| 1039 | | try: |
|---|
| 1040 | | if timeout is None or timeout < 0: |
|---|
| 1041 | | worker.join() |
|---|
| 1042 | | else: |
|---|
| 1043 | | worker.join(timeout) |
|---|
| 1044 | | if worker.isAlive(): |
|---|
| 1045 | | # We exhausted the timeout. |
|---|
| 1046 | | # Forcibly shut down the socket. |
|---|
| 1047 | | c = worker.conn |
|---|
| 1048 | | if c and not c.rfile.closed: |
|---|
| 1049 | | if SSL and isinstance(c.socket, SSL.ConnectionType): |
|---|
| 1050 | | # pyOpenSSL.socket.shutdown takes no args |
|---|
| 1051 | | c.socket.shutdown() |
|---|
| 1052 | | else: |
|---|
| 1053 | | c.socket.shutdown(socket.SHUT_RD) |
|---|
| 1054 | | worker.join() |
|---|
| 1055 | | except (AssertionError, |
|---|
| 1056 | | # Ignore repeated Ctrl-C. |
|---|
| 1057 | | # See http://www.cherrypy.org/ticket/691. |
|---|
| 1058 | | KeyboardInterrupt), exc1: |
|---|
| 1059 | | pass |
|---|
| | 1118 | self.requests.stop(self.shutdown_timeout) |
|---|