3 # Copyright 2011, Google Inc.
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
16 # * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 """Tests for dispatch module."""
39 import set_sys_path # Update sys.path to locate mod_pywebsocket module.
41 from mod_pywebsocket import dispatch
42 from mod_pywebsocket import handshake
46 _TEST_HANDLERS_DIR = os.path.join(
47 os.path.split(__file__)[0], 'testdata', 'handlers')
49 _TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub')
52 class DispatcherTest(unittest.TestCase):
53 """A unittest for dispatch module."""
55 def test_normalize_path(self):
56 self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
57 dispatch._normalize_path('/a/b'))
58 self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
59 dispatch._normalize_path('\\a\\b'))
60 self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
61 dispatch._normalize_path('/a/c/../b'))
62 self.assertEqual(os.path.abspath('abc').replace('\\', '/'),
63 dispatch._normalize_path('abc'))
65 def test_converter(self):
66 converter = dispatch._create_path_to_resource_converter('/a/b')
67 # Python built by MSC inserts a drive name like 'C:\' via realpath().
68 # Converter Generator expands provided path using realpath() and uses
69 # the path including a drive name to verify the prefix.
70 os_root = os.path.realpath('/')
71 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
72 self.assertEqual('/c/h', converter(os_root + 'a/b/c/h_wsh.py'))
73 self.assertEqual(None, converter(os_root + 'a/b/h.py'))
74 self.assertEqual(None, converter('a/b/h_wsh.py'))
76 converter = dispatch._create_path_to_resource_converter('a/b')
77 self.assertEqual('/h', converter(dispatch._normalize_path(
80 converter = dispatch._create_path_to_resource_converter('/a/b///')
81 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
82 self.assertEqual('/h', converter(dispatch._normalize_path(
83 '/a/b/../b/h_wsh.py')))
85 converter = dispatch._create_path_to_resource_converter(
87 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
89 converter = dispatch._create_path_to_resource_converter(r'\a\b')
90 self.assertEqual('/h', converter(os_root + r'a\b\h_wsh.py'))
91 self.assertEqual('/h', converter(os_root + r'a/b/h_wsh.py'))
93 def test_enumerate_handler_file_paths(self):
95 dispatch._enumerate_handler_file_paths(_TEST_HANDLERS_DIR))
97 self.assertEqual(8, len(paths))
99 os.path.join(_TEST_HANDLERS_DIR, 'abort_by_user_wsh.py'),
100 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
101 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
102 os.path.join(_TEST_HANDLERS_DIR, 'sub',
103 'exception_in_transfer_wsh.py'),
104 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
105 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
106 os.path.join(_TEST_HANDLERS_DIR, 'sub',
107 'wrong_handshake_sig_wsh.py'),
108 os.path.join(_TEST_HANDLERS_DIR, 'sub',
109 'wrong_transfer_sig_wsh.py'),
111 for expected, actual in zip(expected_paths, paths):
112 self.assertEqual(expected, actual)
114 def test_source_handler_file(self):
116 dispatch.DispatchException, dispatch._source_handler_file, '')
118 dispatch.DispatchException, dispatch._source_handler_file, 'def')
120 dispatch.DispatchException, dispatch._source_handler_file, '1/0')
121 self.failUnless(dispatch._source_handler_file(
122 'def web_socket_do_extra_handshake(request):pass\n'
123 'def web_socket_transfer_data(request):pass\n'))
125 def test_source_warnings(self):
126 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
127 warnings = dispatcher.source_warnings()
129 expected_warnings = [
130 (os.path.realpath(os.path.join(
131 _TEST_HANDLERS_DIR, 'blank_wsh.py')) +
132 ': web_socket_do_extra_handshake is not defined.'),
133 (os.path.realpath(os.path.join(
134 _TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py')) +
135 ': web_socket_do_extra_handshake is not callable.'),
136 (os.path.realpath(os.path.join(
137 _TEST_HANDLERS_DIR, 'sub', 'wrong_handshake_sig_wsh.py')) +
138 ': web_socket_do_extra_handshake is not defined.'),
139 (os.path.realpath(os.path.join(
140 _TEST_HANDLERS_DIR, 'sub', 'wrong_transfer_sig_wsh.py')) +
141 ': web_socket_transfer_data is not defined.'),
143 self.assertEquals(4, len(warnings))
144 for expected, actual in zip(expected_warnings, warnings):
145 self.assertEquals(expected, actual)
147 def test_do_extra_handshake(self):
148 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
149 request = mock.MockRequest()
150 request.ws_resource = '/origin_check'
151 request.ws_origin = 'http://example.com'
152 dispatcher.do_extra_handshake(request) # Must not raise exception.
154 request.ws_origin = 'http://bad.example.com'
156 dispatcher.do_extra_handshake(request)
157 self.fail('Could not catch HandshakeException with 403 status')
158 except handshake.HandshakeException, e:
159 self.assertEquals(403, e.status)
161 self.fail('Unexpected exception: %r' % e)
163 def test_abort_extra_handshake(self):
164 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
165 request = mock.MockRequest()
166 request.ws_resource = '/abort_by_user'
167 self.assertRaises(handshake.AbortedByUserException,
168 dispatcher.do_extra_handshake, request)
170 def test_transfer_data(self):
171 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
173 request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
174 request.ws_resource = '/origin_check'
175 request.ws_protocol = 'p1'
176 dispatcher.transfer_data(request)
177 self.assertEqual('origin_check_wsh.py is called for /origin_check, p1'
179 request.connection.written_data())
181 request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
182 request.ws_resource = '/sub/plain'
183 request.ws_protocol = None
184 dispatcher.transfer_data(request)
185 self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None'
187 request.connection.written_data())
189 request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
190 request.ws_resource = '/sub/plain?'
191 request.ws_protocol = None
192 dispatcher.transfer_data(request)
193 self.assertEqual('sub/plain_wsh.py is called for /sub/plain?, None'
195 request.connection.written_data())
197 request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
198 request.ws_resource = '/sub/plain?q=v'
199 request.ws_protocol = None
200 dispatcher.transfer_data(request)
201 self.assertEqual('sub/plain_wsh.py is called for /sub/plain?q=v, None'
203 request.connection.written_data())
205 def test_transfer_data_no_handler(self):
206 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
207 for resource in ['/blank', '/sub/non_callable',
208 '/sub/no_wsh_at_the_end', '/does/not/exist']:
209 request = mock.MockRequest(connection=mock.MockConn(''))
210 request.ws_resource = resource
211 request.ws_protocol = 'p2'
213 dispatcher.transfer_data(request)
215 except dispatch.DispatchException, e:
216 self.failUnless(str(e).find('No handler') != -1)
220 def test_transfer_data_handler_exception(self):
221 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
222 request = mock.MockRequest(connection=mock.MockConn(''))
223 request.ws_resource = '/sub/exception_in_transfer'
224 request.ws_protocol = 'p3'
226 dispatcher.transfer_data(request)
229 self.failUnless(str(e).find('Intentional') != -1,
230 'Unexpected exception: %s' % e)
232 def test_abort_transfer_data(self):
233 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
234 request = mock.MockRequest()
235 request.ws_resource = '/abort_by_user'
236 self.assertRaises(handshake.AbortedByUserException,
237 dispatcher.transfer_data, request)
239 def test_scan_dir(self):
240 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
241 self.assertEqual(4, len(disp._handler_suite_map))
242 self.failUnless('/origin_check' in disp._handler_suite_map)
244 '/sub/exception_in_transfer' in disp._handler_suite_map)
245 self.failUnless('/sub/plain' in disp._handler_suite_map)
247 def test_scan_sub_dir(self):
248 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR)
249 self.assertEqual(2, len(disp._handler_suite_map))
250 self.failIf('/origin_check' in disp._handler_suite_map)
252 '/sub/exception_in_transfer' in disp._handler_suite_map)
253 self.failUnless('/sub/plain' in disp._handler_suite_map)
255 def test_scan_sub_dir_as_root(self):
256 disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR,
257 _TEST_HANDLERS_SUB_DIR)
258 self.assertEqual(2, len(disp._handler_suite_map))
259 self.failIf('/origin_check' in disp._handler_suite_map)
260 self.failIf('/sub/exception_in_transfer' in disp._handler_suite_map)
261 self.failIf('/sub/plain' in disp._handler_suite_map)
262 self.failUnless('/exception_in_transfer' in disp._handler_suite_map)
263 self.failUnless('/plain' in disp._handler_suite_map)
265 def test_scan_dir_must_under_root(self):
266 dispatch.Dispatcher('a/b', 'a/b/c') # OK
267 dispatch.Dispatcher('a/b///', 'a/b') # OK
268 self.assertRaises(dispatch.DispatchException,
269 dispatch.Dispatcher, 'a/b/c', 'a/b')
271 def test_resource_path_alias(self):
272 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
273 disp.add_resource_path_alias('/', '/origin_check')
274 self.assertEqual(5, len(disp._handler_suite_map))
275 self.failUnless('/origin_check' in disp._handler_suite_map)
277 '/sub/exception_in_transfer' in disp._handler_suite_map)
278 self.failUnless('/sub/plain' in disp._handler_suite_map)
279 self.failUnless('/' in disp._handler_suite_map)
280 self.assertRaises(dispatch.DispatchException,
281 disp.add_resource_path_alias, '/alias', '/not-exist')
284 if __name__ == '__main__':