Package cherrypy :: Package test :: Module test_session
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_session

  1  from cherrypy.test import test 
  2  test.prefer_parent_path() 
  3   
  4  import os 
  5  localDir = os.path.dirname(__file__) 
  6  import sys 
  7  import threading 
  8  import time 
  9   
 10  import cherrypy 
 11  from cherrypy.lib import sessions 
12 13 -def http_methods_allowed(methods=['GET', 'HEAD']):
14 method = cherrypy.request.method.upper() 15 if method not in methods: 16 cherrypy.response.headers['Allow'] = ", ".join(methods) 17 raise cherrypy.HTTPError(405)
18 19 cherrypy.tools.allow = cherrypy.Tool('on_start_resource', http_methods_allowed)
20 21 22 -def setup_server():
23 class Root: 24 25 _cp_config = {'tools.sessions.on': True, 26 'tools.sessions.storage_type' : 'ram', 27 'tools.sessions.storage_path' : localDir, 28 'tools.sessions.timeout': (1.0 / 60), 29 'tools.sessions.clean_freq': (1.0 / 60), 30 } 31 32 def testGen(self): 33 counter = cherrypy.session.get('counter', 0) + 1 34 cherrypy.session['counter'] = counter 35 yield str(counter)
36 testGen.exposed = True 37 38 def testStr(self): 39 counter = cherrypy.session.get('counter', 0) + 1 40 cherrypy.session['counter'] = counter 41 return str(counter) 42 testStr.exposed = True 43 44 def setsessiontype(self, newtype): 45 self.__class__._cp_config.update({'tools.sessions.storage_type': newtype}) 46 setsessiontype.exposed = True 47 48 def index(self): 49 sess = cherrypy.session 50 c = sess.get('counter', 0) + 1 51 time.sleep(0.01) 52 sess['counter'] = c 53 return str(c) 54 index.exposed = True 55 56 def keyin(self, key): 57 return str(key in cherrypy.session) 58 keyin.exposed = True 59 60 def delete(self): 61 cherrypy.session.delete() 62 sessions.expire() 63 return "done" 64 delete.exposed = True 65 66 def delkey(self, key): 67 del cherrypy.session[key] 68 return "OK" 69 delkey.exposed = True 70 71 def blah(self): 72 return self._cp_config['tools.sessions.storage_type'] 73 blah.exposed = True 74 75 def iredir(self): 76 raise cherrypy.InternalRedirect('/blah') 77 iredir.exposed = True 78 79 @cherrypy.tools.allow(methods=['GET']) 80 def restricted(self): 81 return cherrypy.request.method 82 restricted.exposed = True 83 84 cherrypy.tree.mount(Root()) 85 cherrypy.config.update({'environment': 'test_suite'}) 86 87 88 from cherrypy.test import helper
89 90 -class SessionTest(helper.CPWebCase):
91
92 - def tearDown(self):
93 # Clean up sessions. 94 for fname in os.listdir(localDir): 95 if fname.startswith(sessions.FileSession.SESSION_PREFIX): 96 os.unlink(os.path.join(localDir, fname))
97
98 - def test_0_Session(self):
99 self.getPage('/testStr') 100 self.assertBody('1') 101 self.getPage('/testGen', self.cookies) 102 self.assertBody('2') 103 self.getPage('/testStr', self.cookies) 104 self.assertBody('3') 105 self.getPage('/delkey?key=counter', self.cookies) 106 self.assertStatus(200) 107 108 self.getPage('/setsessiontype/file') 109 self.getPage('/testStr') 110 self.assertBody('1') 111 self.getPage('/testGen', self.cookies) 112 self.assertBody('2') 113 self.getPage('/testStr', self.cookies) 114 self.assertBody('3') 115 self.getPage('/delkey?key=counter', self.cookies) 116 self.assertStatus(200) 117 118 # Wait for the session.timeout (1 second) 119 time.sleep(2) 120 self.getPage('/') 121 self.assertBody('1') 122 123 # Test session __contains__ 124 self.getPage('/keyin?key=counter', self.cookies) 125 self.assertBody("True") 126 127 # Test session delete 128 self.getPage('/delete', self.cookies) 129 self.assertBody("done") 130 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')] 131 self.assertEqual(f(), []) 132 133 # Wait for the cleanup thread to delete remaining session files 134 self.getPage('/') 135 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')] 136 self.assertNotEqual(f(), []) 137 time.sleep(2) 138 self.assertEqual(f(), [])
139
140 - def test_1_Ram_Concurrency(self):
141 self.getPage('/setsessiontype/ram') 142 self._test_Concurrency()
143
144 - def test_2_File_Concurrency(self):
145 self.getPage('/setsessiontype/file') 146 self._test_Concurrency()
147
148 - def _test_Concurrency(self):
149 client_thread_count = 5 150 request_count = 30 151 152 # Get initial cookie 153 self.getPage("/") 154 self.assertBody("1") 155 cookies = self.cookies 156 157 data_dict = {} 158 159 def request(index): 160 for i in xrange(request_count): 161 self.getPage("/", cookies) 162 # Uncomment the following line to prove threads overlap. 163 ## print index, 164 data_dict[index] = v = int(self.body)
165 166 # Start <request_count> concurrent requests from 167 # each of <client_thread_count> clients 168 ts = [] 169 for c in xrange(client_thread_count): 170 data_dict[c] = 0 171 t = threading.Thread(target=request, args=(c,)) 172 ts.append(t) 173 t.start() 174 175 for t in ts: 176 t.join() 177 178 hitcount = max(data_dict.values()) 179 expected = 1 + (client_thread_count * request_count) 180 self.assertEqual(hitcount, expected)
181
182 - def test_3_Redirect(self):
183 # Start a new session 184 self.getPage('/testStr') 185 self.getPage('/iredir', self.cookies) 186 self.assertBody("file")
187
188 - def test_4_File_deletion(self):
189 # Start a new session 190 self.getPage('/testStr') 191 # Delete the session file manually and retry. 192 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 193 path = os.path.join(localDir, "session-" + id) 194 os.unlink(path) 195 self.getPage('/testStr', self.cookies)
196
197 - def test_5_Error_paths(self):
198 self.getPage('/unknown/page') 199 self.assertErrorPage(404, "The path '/unknown/page' was not found.") 200 201 # Note: this path is *not* the same as above. The above 202 # takes a normal route through the session code; this one 203 # skips the session code's before_handler and only calls 204 # before_finalize (save) and on_end (close). So the session 205 # code has to survive calling save/close without init. 206 self.getPage('/restricted', self.cookies, method='POST') 207 self.assertErrorPage(405, "Specified method is invalid for this server.")
208 209 210 if __name__ == "__main__": 211 setup_server() 212 helper.testmain() 213