1 from cherrypy.test import test
2 test.prefer_parent_path()
3
4 import os
5 localDir = os.path.dirname(__file__)
6 import sys
7 import threading
8 import time
9
10 import cherrypy
11 from cherrypy.lib import sessions
18
19 cherrypy.tools.allow = cherrypy.Tool('on_start_resource', http_methods_allowed)
23 class Root:
24
25 _cp_config = {'tools.sessions.on': True,
26 'tools.sessions.storage_type' : 'ram',
27 'tools.sessions.storage_path' : localDir,
28 'tools.sessions.timeout': (1.0 / 60),
29 'tools.sessions.clean_freq': (1.0 / 60),
30 }
31
32 def testGen(self):
33 counter = cherrypy.session.get('counter', 0) + 1
34 cherrypy.session['counter'] = counter
35 yield str(counter)
36 testGen.exposed = True
37
38 def testStr(self):
39 counter = cherrypy.session.get('counter', 0) + 1
40 cherrypy.session['counter'] = counter
41 return str(counter)
42 testStr.exposed = True
43
44 def setsessiontype(self, newtype):
45 self.__class__._cp_config.update({'tools.sessions.storage_type': newtype})
46 setsessiontype.exposed = True
47
48 def index(self):
49 sess = cherrypy.session
50 c = sess.get('counter', 0) + 1
51 time.sleep(0.01)
52 sess['counter'] = c
53 return str(c)
54 index.exposed = True
55
56 def keyin(self, key):
57 return str(key in cherrypy.session)
58 keyin.exposed = True
59
60 def delete(self):
61 cherrypy.session.delete()
62 sessions.expire()
63 return "done"
64 delete.exposed = True
65
66 def delkey(self, key):
67 del cherrypy.session[key]
68 return "OK"
69 delkey.exposed = True
70
71 def blah(self):
72 return self._cp_config['tools.sessions.storage_type']
73 blah.exposed = True
74
75 def iredir(self):
76 raise cherrypy.InternalRedirect('/blah')
77 iredir.exposed = True
78
79 @cherrypy.tools.allow(methods=['GET'])
80 def restricted(self):
81 return cherrypy.request.method
82 restricted.exposed = True
83
84 cherrypy.tree.mount(Root())
85 cherrypy.config.update({'environment': 'test_suite'})
86
87
88 from cherrypy.test import helper
91
97
99 self.getPage('/testStr')
100 self.assertBody('1')
101 self.getPage('/testGen', self.cookies)
102 self.assertBody('2')
103 self.getPage('/testStr', self.cookies)
104 self.assertBody('3')
105 self.getPage('/delkey?key=counter', self.cookies)
106 self.assertStatus(200)
107
108 self.getPage('/setsessiontype/file')
109 self.getPage('/testStr')
110 self.assertBody('1')
111 self.getPage('/testGen', self.cookies)
112 self.assertBody('2')
113 self.getPage('/testStr', self.cookies)
114 self.assertBody('3')
115 self.getPage('/delkey?key=counter', self.cookies)
116 self.assertStatus(200)
117
118
119 time.sleep(2)
120 self.getPage('/')
121 self.assertBody('1')
122
123
124 self.getPage('/keyin?key=counter', self.cookies)
125 self.assertBody("True")
126
127
128 self.getPage('/delete', self.cookies)
129 self.assertBody("done")
130 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')]
131 self.assertEqual(f(), [])
132
133
134 self.getPage('/')
135 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')]
136 self.assertNotEqual(f(), [])
137 time.sleep(2)
138 self.assertEqual(f(), [])
139
143
147
149 client_thread_count = 5
150 request_count = 30
151
152
153 self.getPage("/")
154 self.assertBody("1")
155 cookies = self.cookies
156
157 data_dict = {}
158
159 def request(index):
160 for i in xrange(request_count):
161 self.getPage("/", cookies)
162
163
164 data_dict[index] = v = int(self.body)
165
166
167
168 ts = []
169 for c in xrange(client_thread_count):
170 data_dict[c] = 0
171 t = threading.Thread(target=request, args=(c,))
172 ts.append(t)
173 t.start()
174
175 for t in ts:
176 t.join()
177
178 hitcount = max(data_dict.values())
179 expected = 1 + (client_thread_count * request_count)
180 self.assertEqual(hitcount, expected)
181
187
189
190 self.getPage('/testStr')
191
192 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
193 path = os.path.join(localDir, "session-" + id)
194 os.unlink(path)
195 self.getPage('/testStr', self.cookies)
196
198 self.getPage('/unknown/page')
199 self.assertErrorPage(404, "The path '/unknown/page' was not found.")
200
201
202
203
204
205
206 self.getPage('/restricted', self.cookies, method='POST')
207 self.assertErrorPage(405, "Specified method is invalid for this server.")
208
209
210 if __name__ == "__main__":
211 setup_server()
212 helper.testmain()
213