Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_sip.py
1 # -*- test-case-name: twisted.test.test_sip -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """Session Initialization Protocol tests."""
7
8 from twisted.trial import unittest, util
9 from twisted.protocols import sip
10 from twisted.internet import defer, reactor, utils
11 from twisted.python.versions import Version
12
13 from twisted.test import proto_helpers
14
15 from twisted import cred
16 import twisted.cred.portal
17 import twisted.cred.checkers
18
19 from zope.interface import implements
20
21
22 # request, prefixed by random CRLFs
23 request1 = "\n\r\n\n\r" + """\
24 INVITE sip:foo SIP/2.0
25 From: mo
26 To: joe
27 Content-Length: 4
28
29 abcd""".replace("\n", "\r\n")
30
31 # request, no content-length
32 request2 = """INVITE sip:foo SIP/2.0
33 From: mo
34 To: joe
35
36 1234""".replace("\n", "\r\n")
37
38 # request, with garbage after
39 request3 = """INVITE sip:foo SIP/2.0
40 From: mo
41 To: joe
42 Content-Length: 4
43
44 1234
45
46 lalalal""".replace("\n", "\r\n")
47
48 # three requests
49 request4 = """INVITE sip:foo SIP/2.0
50 From: mo
51 To: joe
52 Content-Length: 0
53
54 INVITE sip:loop SIP/2.0
55 From: foo
56 To: bar
57 Content-Length: 4
58
59 abcdINVITE sip:loop SIP/2.0
60 From: foo
61 To: bar
62 Content-Length: 4
63
64 1234""".replace("\n", "\r\n")
65
66 # response, no content
67 response1 = """SIP/2.0 200 OK
68 From:  foo
69 To:bar
70 Content-Length: 0
71
72 """.replace("\n", "\r\n")
73
74 # short header version
75 request_short = """\
76 INVITE sip:foo SIP/2.0
77 f: mo
78 t: joe
79 l: 4
80
81 abcd""".replace("\n", "\r\n")
82
83 request_natted = """\
84 INVITE sip:foo SIP/2.0
85 Via: SIP/2.0/UDP 10.0.0.1:5060;rport
86
87 """.replace("\n", "\r\n")
88
89 class TestRealm:
90     def requestAvatar(self, avatarId, mind, *interfaces):
91         return sip.IContact, None, lambda: None
92
93 class MessageParsingTestCase(unittest.TestCase):
94     def setUp(self):
95         self.l = []
96         self.parser = sip.MessagesParser(self.l.append)
97
98     def feedMessage(self, message):
99         self.parser.dataReceived(message)
100         self.parser.dataDone()
101
102     def validateMessage(self, m, method, uri, headers, body):
103         """Validate Requests."""
104         self.assertEqual(m.method, method)
105         self.assertEqual(m.uri.toString(), uri)
106         self.assertEqual(m.headers, headers)
107         self.assertEqual(m.body, body)
108         self.assertEqual(m.finished, 1)
109
110     def testSimple(self):
111         l = self.l
112         self.feedMessage(request1)
113         self.assertEqual(len(l), 1)
114         self.validateMessage(
115             l[0], "INVITE", "sip:foo",
116             {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
117             "abcd")
118
119     def testTwoMessages(self):
120         l = self.l
121         self.feedMessage(request1)
122         self.feedMessage(request2)
123         self.assertEqual(len(l), 2)
124         self.validateMessage(
125             l[0], "INVITE", "sip:foo",
126             {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
127             "abcd")
128         self.validateMessage(l[1], "INVITE", "sip:foo",
129                              {"from": ["mo"], "to": ["joe"]},
130                              "1234")
131
132     def testGarbage(self):
133         l = self.l
134         self.feedMessage(request3)
135         self.assertEqual(len(l), 1)
136         self.validateMessage(
137             l[0], "INVITE", "sip:foo",
138             {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
139             "1234")
140
141     def testThreeInOne(self):
142         l = self.l
143         self.feedMessage(request4)
144         self.assertEqual(len(l), 3)
145         self.validateMessage(
146             l[0], "INVITE", "sip:foo",
147             {"from": ["mo"], "to": ["joe"], "content-length": ["0"]},
148             "")
149         self.validateMessage(
150             l[1], "INVITE", "sip:loop",
151             {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
152             "abcd")
153         self.validateMessage(
154             l[2], "INVITE", "sip:loop",
155             {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
156             "1234")
157
158     def testShort(self):
159         l = self.l
160         self.feedMessage(request_short)
161         self.assertEqual(len(l), 1)
162         self.validateMessage(
163             l[0], "INVITE", "sip:foo",
164             {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
165             "abcd")
166
167     def testSimpleResponse(self):
168         l = self.l
169         self.feedMessage(response1)
170         self.assertEqual(len(l), 1)
171         m = l[0]
172         self.assertEqual(m.code, 200)
173         self.assertEqual(m.phrase, "OK")
174         self.assertEqual(
175             m.headers,
176             {"from": ["foo"], "to": ["bar"], "content-length": ["0"]})
177         self.assertEqual(m.body, "")
178         self.assertEqual(m.finished, 1)
179
180
181 class MessageParsingTestCase2(MessageParsingTestCase):
182     """Same as base class, but feed data char by char."""
183
184     def feedMessage(self, message):
185         for c in message:
186             self.parser.dataReceived(c)
187         self.parser.dataDone()
188
189
190 class MakeMessageTestCase(unittest.TestCase):
191
192     def testRequest(self):
193         r = sip.Request("INVITE", "sip:foo")
194         r.addHeader("foo", "bar")
195         self.assertEqual(
196             r.toString(),
197             "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\r\n")
198
199     def testResponse(self):
200         r = sip.Response(200, "OK")
201         r.addHeader("foo", "bar")
202         r.addHeader("Content-Length", "4")
203         r.bodyDataReceived("1234")
204         self.assertEqual(
205             r.toString(),
206             "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-Length: 4\r\n\r\n1234")
207
208     def testStatusCode(self):
209         r = sip.Response(200)
210         self.assertEqual(r.toString(), "SIP/2.0 200 OK\r\n\r\n")
211
212
213 class ViaTestCase(unittest.TestCase):
214
215     def checkRoundtrip(self, v):
216         s = v.toString()
217         self.assertEqual(s, sip.parseViaHeader(s).toString())
218
219     def testExtraWhitespace(self):
220         v1 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060')
221         v2 = sip.parseViaHeader('SIP/2.0/UDP     192.168.1.1:5060')
222         self.assertEqual(v1.transport, v2.transport)
223         self.assertEqual(v1.host, v2.host)
224         self.assertEqual(v1.port, v2.port)
225
226     def test_complex(self):
227         """
228         Test parsing a Via header with one of everything.
229         """
230         s = ("SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1"
231              " ;branch=a7c6a8dlze (Example)")
232         v = sip.parseViaHeader(s)
233         self.assertEqual(v.transport, "UDP")
234         self.assertEqual(v.host, "first.example.com")
235         self.assertEqual(v.port, 4000)
236         self.assertEqual(v.rport, None)
237         self.assertEqual(v.rportValue, None)
238         self.assertEqual(v.rportRequested, False)
239         self.assertEqual(v.ttl, 16)
240         self.assertEqual(v.maddr, "224.2.0.1")
241         self.assertEqual(v.branch, "a7c6a8dlze")
242         self.assertEqual(v.hidden, 0)
243         self.assertEqual(v.toString(),
244                           "SIP/2.0/UDP first.example.com:4000"
245                           ";ttl=16;branch=a7c6a8dlze;maddr=224.2.0.1")
246         self.checkRoundtrip(v)
247
248     def test_simple(self):
249         """
250         Test parsing a simple Via header.
251         """
252         s = "SIP/2.0/UDP example.com;hidden"
253         v = sip.parseViaHeader(s)
254         self.assertEqual(v.transport, "UDP")
255         self.assertEqual(v.host, "example.com")
256         self.assertEqual(v.port, 5060)
257         self.assertEqual(v.rport, None)
258         self.assertEqual(v.rportValue, None)
259         self.assertEqual(v.rportRequested, False)
260         self.assertEqual(v.ttl, None)
261         self.assertEqual(v.maddr, None)
262         self.assertEqual(v.branch, None)
263         self.assertEqual(v.hidden, True)
264         self.assertEqual(v.toString(),
265                           "SIP/2.0/UDP example.com:5060;hidden")
266         self.checkRoundtrip(v)
267
268     def testSimpler(self):
269         v = sip.Via("example.com")
270         self.checkRoundtrip(v)
271
272
273     def test_deprecatedRPort(self):
274         """
275         Setting rport to True is deprecated, but still produces a Via header
276         with the expected properties.
277         """
278         v = sip.Via("foo.bar", rport=True)
279
280         warnings = self.flushWarnings(
281             offendingFunctions=[self.test_deprecatedRPort])
282         self.assertEqual(len(warnings), 1)
283         self.assertEqual(
284             warnings[0]['message'],
285             'rport=True is deprecated since Twisted 9.0.')
286         self.assertEqual(
287             warnings[0]['category'],
288             DeprecationWarning)
289
290         self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
291         self.assertEqual(v.rport, True)
292         self.assertEqual(v.rportRequested, True)
293         self.assertEqual(v.rportValue, None)
294
295
296     def test_rport(self):
297         """
298         An rport setting of None should insert the parameter with no value.
299         """
300         v = sip.Via("foo.bar", rport=None)
301         self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
302         self.assertEqual(v.rportRequested, True)
303         self.assertEqual(v.rportValue, None)
304
305
306     def test_rportValue(self):
307         """
308         An rport numeric setting should insert the parameter with the number
309         value given.
310         """
311         v = sip.Via("foo.bar", rport=1)
312         self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport=1")
313         self.assertEqual(v.rportRequested, False)
314         self.assertEqual(v.rportValue, 1)
315         self.assertEqual(v.rport, 1)
316
317
318     def testNAT(self):
319         s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345"
320         v = sip.parseViaHeader(s)
321         self.assertEqual(v.transport, "UDP")
322         self.assertEqual(v.host, "10.0.0.1")
323         self.assertEqual(v.port, 5060)
324         self.assertEqual(v.received, "22.13.1.5")
325         self.assertEqual(v.rport, 12345)
326
327         self.assertNotEquals(v.toString().find("rport=12345"), -1)
328
329
330     def test_unknownParams(self):
331        """
332        Parsing and serializing Via headers with unknown parameters should work.
333        """
334        s = "SIP/2.0/UDP example.com:5060;branch=a12345b;bogus;pie=delicious"
335        v = sip.parseViaHeader(s)
336        self.assertEqual(v.toString(), s)
337
338
339
340 class URLTestCase(unittest.TestCase):
341
342     def testRoundtrip(self):
343         for url in [
344             "sip:j.doe@big.com",
345             "sip:j.doe:secret@big.com;transport=tcp",
346             "sip:j.doe@big.com?subject=project",
347             "sip:example.com",
348             ]:
349             self.assertEqual(sip.parseURL(url).toString(), url)
350
351     def testComplex(self):
352         s = ("sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;"
353              "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d")
354         url = sip.parseURL(s)
355         for k, v in [("username", "user"), ("password", "pass"),
356                      ("host", "hosta"), ("port", 123),
357                      ("transport", "udp"), ("usertype", "phone"),
358                      ("method", "foo"), ("ttl", 12),
359                      ("maddr", "1.2.3.4"), ("other", ["blah", "goo=bar"]),
360                      ("headers", {"a": "b", "c": "d"})]:
361             self.assertEqual(getattr(url, k), v)
362
363
364 class ParseTestCase(unittest.TestCase):
365
366     def testParseAddress(self):
367         for address, name, urls, params in [
368             ('"A. G. Bell" <sip:foo@example.com>',
369              "A. G. Bell", "sip:foo@example.com", {}),
370             ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}),
371             ("sip:foo@example.com", "", "sip:foo@example.com", {}),
372             ("<sip:foo@example.com>", "", "sip:foo@example.com", {}),
373             ("foo <sip:foo@example.com>;tag=bar;foo=baz", "foo",
374              "sip:foo@example.com", {"tag": "bar", "foo": "baz"}),
375             ]:
376             gname, gurl, gparams = sip.parseAddress(address)
377             self.assertEqual(name, gname)
378             self.assertEqual(gurl.toString(), urls)
379             self.assertEqual(gparams, params)
380
381
382 class DummyLocator:
383     implements(sip.ILocator)
384     def getAddress(self, logicalURL):
385         return defer.succeed(sip.URL("server.com", port=5060))
386
387 class FailingLocator:
388     implements(sip.ILocator)
389     def getAddress(self, logicalURL):
390         return defer.fail(LookupError())
391
392
393 class ProxyTestCase(unittest.TestCase):
394
395     def setUp(self):
396         self.proxy = sip.Proxy("127.0.0.1")
397         self.proxy.locator = DummyLocator()
398         self.sent = []
399         self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
400
401     def testRequestForward(self):
402         r = sip.Request("INVITE", "sip:foo")
403         r.addHeader("via", sip.Via("1.2.3.4").toString())
404         r.addHeader("via", sip.Via("1.2.3.5").toString())
405         r.addHeader("foo", "bar")
406         r.addHeader("to", "<sip:joe@server.com>")
407         r.addHeader("contact", "<sip:joe@1.2.3.5>")
408         self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
409         self.assertEqual(len(self.sent), 1)
410         dest, m = self.sent[0]
411         self.assertEqual(dest.port, 5060)
412         self.assertEqual(dest.host, "server.com")
413         self.assertEqual(m.uri.toString(), "sip:foo")
414         self.assertEqual(m.method, "INVITE")
415         self.assertEqual(m.headers["via"],
416                           ["SIP/2.0/UDP 127.0.0.1:5060",
417                            "SIP/2.0/UDP 1.2.3.4:5060",
418                            "SIP/2.0/UDP 1.2.3.5:5060"])
419
420
421     def testReceivedRequestForward(self):
422         r = sip.Request("INVITE", "sip:foo")
423         r.addHeader("via", sip.Via("1.2.3.4").toString())
424         r.addHeader("foo", "bar")
425         r.addHeader("to", "<sip:joe@server.com>")
426         r.addHeader("contact", "<sip:joe@1.2.3.4>")
427         self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
428         dest, m = self.sent[0]
429         self.assertEqual(m.headers["via"],
430                           ["SIP/2.0/UDP 127.0.0.1:5060",
431                            "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"])
432
433
434     def testResponseWrongVia(self):
435         # first via must match proxy's address
436         r = sip.Response(200)
437         r.addHeader("via", sip.Via("foo.com").toString())
438         self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
439         self.assertEqual(len(self.sent), 0)
440
441     def testResponseForward(self):
442         r = sip.Response(200)
443         r.addHeader("via", sip.Via("127.0.0.1").toString())
444         r.addHeader("via", sip.Via("client.com", port=1234).toString())
445         self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
446         self.assertEqual(len(self.sent), 1)
447         dest, m = self.sent[0]
448         self.assertEqual((dest.host, dest.port), ("client.com", 1234))
449         self.assertEqual(m.code, 200)
450         self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:1234"])
451
452     def testReceivedResponseForward(self):
453         r = sip.Response(200)
454         r.addHeader("via", sip.Via("127.0.0.1").toString())
455         r.addHeader(
456             "via",
457             sip.Via("10.0.0.1", received="client.com").toString())
458         self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
459         self.assertEqual(len(self.sent), 1)
460         dest, m = self.sent[0]
461         self.assertEqual((dest.host, dest.port), ("client.com", 5060))
462
463     def testResponseToUs(self):
464         r = sip.Response(200)
465         r.addHeader("via", sip.Via("127.0.0.1").toString())
466         l = []
467         self.proxy.gotResponse = lambda *a: l.append(a)
468         self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
469         self.assertEqual(len(l), 1)
470         m, addr = l[0]
471         self.assertEqual(len(m.headers.get("via", [])), 0)
472         self.assertEqual(m.code, 200)
473
474     def testLoop(self):
475         r = sip.Request("INVITE", "sip:foo")
476         r.addHeader("via", sip.Via("1.2.3.4").toString())
477         r.addHeader("via", sip.Via("127.0.0.1").toString())
478         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
479         self.assertEqual(self.sent, [])
480
481     def testCantForwardRequest(self):
482         r = sip.Request("INVITE", "sip:foo")
483         r.addHeader("via", sip.Via("1.2.3.4").toString())
484         r.addHeader("to", "<sip:joe@server.com>")
485         self.proxy.locator = FailingLocator()
486         self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
487         self.assertEqual(len(self.sent), 1)
488         dest, m = self.sent[0]
489         self.assertEqual((dest.host, dest.port), ("1.2.3.4", 5060))
490         self.assertEqual(m.code, 404)
491         self.assertEqual(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"])
492
493     def testCantForwardResponse(self):
494         pass
495
496     #testCantForwardResponse.skip = "not implemented yet"
497
498
499 class RegistrationTestCase(unittest.TestCase):
500
501     def setUp(self):
502         self.proxy = sip.RegisterProxy(host="127.0.0.1")
503         self.registry = sip.InMemoryRegistry("bell.example.com")
504         self.proxy.registry = self.proxy.locator = self.registry
505         self.sent = []
506         self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
507     setUp = utils.suppressWarnings(setUp,
508         util.suppress(category=DeprecationWarning,
509             message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'))
510
511     def tearDown(self):
512         for d, uri in self.registry.users.values():
513             d.cancel()
514         del self.proxy
515
516     def register(self):
517         r = sip.Request("REGISTER", "sip:bell.example.com")
518         r.addHeader("to", "sip:joe@bell.example.com")
519         r.addHeader("contact", "sip:joe@client.com:1234")
520         r.addHeader("via", sip.Via("client.com").toString())
521         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
522
523     def unregister(self):
524         r = sip.Request("REGISTER", "sip:bell.example.com")
525         r.addHeader("to", "sip:joe@bell.example.com")
526         r.addHeader("contact", "*")
527         r.addHeader("via", sip.Via("client.com").toString())
528         r.addHeader("expires", "0")
529         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
530
531     def testRegister(self):
532         self.register()
533         dest, m = self.sent[0]
534         self.assertEqual((dest.host, dest.port), ("client.com", 5060))
535         self.assertEqual(m.code, 200)
536         self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
537         self.assertEqual(m.headers["to"], ["sip:joe@bell.example.com"])
538         self.assertEqual(m.headers["contact"], ["sip:joe@client.com:5060"])
539         self.failUnless(
540             int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598))
541         self.assertEqual(len(self.registry.users), 1)
542         dc, uri = self.registry.users["joe"]
543         self.assertEqual(uri.toString(), "sip:joe@client.com:5060")
544         d = self.proxy.locator.getAddress(sip.URL(username="joe",
545                                                   host="bell.example.com"))
546         d.addCallback(lambda desturl : (desturl.host, desturl.port))
547         d.addCallback(self.assertEqual, ('client.com', 5060))
548         return d
549
550     def testUnregister(self):
551         self.register()
552         self.unregister()
553         dest, m = self.sent[1]
554         self.assertEqual((dest.host, dest.port), ("client.com", 5060))
555         self.assertEqual(m.code, 200)
556         self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
557         self.assertEqual(m.headers["to"], ["sip:joe@bell.example.com"])
558         self.assertEqual(m.headers["contact"], ["sip:joe@client.com:5060"])
559         self.assertEqual(m.headers["expires"], ["0"])
560         self.assertEqual(self.registry.users, {})
561
562
563     def addPortal(self):
564         r = TestRealm()
565         p = cred.portal.Portal(r)
566         c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
567         c.addUser('userXname@127.0.0.1', 'passXword')
568         p.registerChecker(c)
569         self.proxy.portal = p
570
571     def testFailedAuthentication(self):
572         self.addPortal()
573         self.register()
574
575         self.assertEqual(len(self.registry.users), 0)
576         self.assertEqual(len(self.sent), 1)
577         dest, m = self.sent[0]
578         self.assertEqual(m.code, 401)
579
580
581     def test_basicAuthentication(self):
582         """
583         Test that registration with basic authentication suceeds.
584         """
585         self.addPortal()
586         self.proxy.authorizers = self.proxy.authorizers.copy()
587         self.proxy.authorizers['basic'] = sip.BasicAuthorizer()
588         warnings = self.flushWarnings(
589             offendingFunctions=[self.test_basicAuthentication])
590         self.assertEqual(len(warnings), 1)
591         self.assertEqual(
592             warnings[0]['message'],
593             "twisted.protocols.sip.BasicAuthorizer was deprecated in "
594             "Twisted 9.0.0")
595         self.assertEqual(
596             warnings[0]['category'],
597             DeprecationWarning)
598         r = sip.Request("REGISTER", "sip:bell.example.com")
599         r.addHeader("to", "sip:joe@bell.example.com")
600         r.addHeader("contact", "sip:joe@client.com:1234")
601         r.addHeader("via", sip.Via("client.com").toString())
602         r.addHeader("authorization",
603                     "Basic " + "userXname:passXword".encode('base64'))
604         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
605
606         self.assertEqual(len(self.registry.users), 1)
607         self.assertEqual(len(self.sent), 1)
608         dest, m = self.sent[0]
609         self.assertEqual(m.code, 200)
610
611
612     def test_failedBasicAuthentication(self):
613         """
614         Failed registration with basic authentication results in an
615         unauthorized error response.
616         """
617         self.addPortal()
618         self.proxy.authorizers = self.proxy.authorizers.copy()
619         self.proxy.authorizers['basic'] = sip.BasicAuthorizer()
620         warnings = self.flushWarnings(
621             offendingFunctions=[self.test_failedBasicAuthentication])
622         self.assertEqual(len(warnings), 1)
623         self.assertEqual(
624             warnings[0]['message'],
625             "twisted.protocols.sip.BasicAuthorizer was deprecated in "
626             "Twisted 9.0.0")
627         self.assertEqual(
628             warnings[0]['category'],
629             DeprecationWarning)
630         r = sip.Request("REGISTER", "sip:bell.example.com")
631         r.addHeader("to", "sip:joe@bell.example.com")
632         r.addHeader("contact", "sip:joe@client.com:1234")
633         r.addHeader("via", sip.Via("client.com").toString())
634         r.addHeader(
635             "authorization", "Basic " + "userXname:password".encode('base64'))
636         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
637
638         self.assertEqual(len(self.registry.users), 0)
639         self.assertEqual(len(self.sent), 1)
640         dest, m = self.sent[0]
641         self.assertEqual(m.code, 401)
642
643
644     def testWrongDomainRegister(self):
645         r = sip.Request("REGISTER", "sip:wrong.com")
646         r.addHeader("to", "sip:joe@bell.example.com")
647         r.addHeader("contact", "sip:joe@client.com:1234")
648         r.addHeader("via", sip.Via("client.com").toString())
649         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
650         self.assertEqual(len(self.sent), 0)
651
652     def testWrongToDomainRegister(self):
653         r = sip.Request("REGISTER", "sip:bell.example.com")
654         r.addHeader("to", "sip:joe@foo.com")
655         r.addHeader("contact", "sip:joe@client.com:1234")
656         r.addHeader("via", sip.Via("client.com").toString())
657         self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
658         self.assertEqual(len(self.sent), 0)
659
660     def testWrongDomainLookup(self):
661         self.register()
662         url = sip.URL(username="joe", host="foo.com")
663         d = self.proxy.locator.getAddress(url)
664         self.assertFailure(d, LookupError)
665         return d
666
667     def testNoContactLookup(self):
668         self.register()
669         url = sip.URL(username="jane", host="bell.example.com")
670         d = self.proxy.locator.getAddress(url)
671         self.assertFailure(d, LookupError)
672         return d
673
674
675 class Client(sip.Base):
676
677     def __init__(self):
678         sip.Base.__init__(self)
679         self.received = []
680         self.deferred = defer.Deferred()
681
682     def handle_response(self, response, addr):
683         self.received.append(response)
684         self.deferred.callback(self.received)
685
686
687 class LiveTest(unittest.TestCase):
688
689     def setUp(self):
690         self.proxy = sip.RegisterProxy(host="127.0.0.1")
691         self.registry = sip.InMemoryRegistry("bell.example.com")
692         self.proxy.registry = self.proxy.locator = self.registry
693         self.serverPort = reactor.listenUDP(
694             0, self.proxy, interface="127.0.0.1")
695         self.client = Client()
696         self.clientPort = reactor.listenUDP(
697             0, self.client, interface="127.0.0.1")
698         self.serverAddress = (self.serverPort.getHost().host,
699                               self.serverPort.getHost().port)
700     setUp = utils.suppressWarnings(setUp,
701         util.suppress(category=DeprecationWarning,
702             message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'))
703
704     def tearDown(self):
705         for d, uri in self.registry.users.values():
706             d.cancel()
707         d1 = defer.maybeDeferred(self.clientPort.stopListening)
708         d2 = defer.maybeDeferred(self.serverPort.stopListening)
709         return defer.gatherResults([d1, d2])
710
711     def testRegister(self):
712         p = self.clientPort.getHost().port
713         r = sip.Request("REGISTER", "sip:bell.example.com")
714         r.addHeader("to", "sip:joe@bell.example.com")
715         r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
716         r.addHeader("via", sip.Via("127.0.0.1", port=p).toString())
717         self.client.sendMessage(
718             sip.URL(host="127.0.0.1", port=self.serverAddress[1]), r)
719         d = self.client.deferred
720         def check(received):
721             self.assertEqual(len(received), 1)
722             r = received[0]
723             self.assertEqual(r.code, 200)
724         d.addCallback(check)
725         return d
726
727     def test_amoralRPort(self):
728         """
729         rport is allowed without a value, apparently because server
730         implementors might be too stupid to check the received port
731         against 5060 and see if they're equal, and because client
732         implementors might be too stupid to bind to port 5060, or set a
733         value on the rport parameter they send if they bind to another
734         port.
735         """
736         p = self.clientPort.getHost().port
737         r = sip.Request("REGISTER", "sip:bell.example.com")
738         r.addHeader("to", "sip:joe@bell.example.com")
739         r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
740         r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString())
741         warnings = self.flushWarnings(
742             offendingFunctions=[self.test_amoralRPort])
743         self.assertEqual(len(warnings), 1)
744         self.assertEqual(
745             warnings[0]['message'],
746             'rport=True is deprecated since Twisted 9.0.')
747         self.assertEqual(
748             warnings[0]['category'],
749             DeprecationWarning)
750         self.client.sendMessage(sip.URL(host="127.0.0.1",
751                                         port=self.serverAddress[1]),
752                                 r)
753         d = self.client.deferred
754         def check(received):
755             self.assertEqual(len(received), 1)
756             r = received[0]
757             self.assertEqual(r.code, 200)
758         d.addCallback(check)
759         return d
760
761
762
763 registerRequest = """
764 REGISTER sip:intarweb.us SIP/2.0\r
765 Via: SIP/2.0/UDP 192.168.1.100:50609\r
766 From: <sip:exarkun@intarweb.us:50609>\r
767 To: <sip:exarkun@intarweb.us:50609>\r
768 Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r
769 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
770 CSeq: 9898 REGISTER\r
771 Expires: 500\r
772 User-Agent: X-Lite build 1061\r
773 Content-Length: 0\r
774 \r
775 """
776
777 challengeResponse = """\
778 SIP/2.0 401 Unauthorized\r
779 Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r
780 To: <sip:exarkun@intarweb.us:50609>\r
781 From: <sip:exarkun@intarweb.us:50609>\r
782 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
783 CSeq: 9898 REGISTER\r
784 WWW-Authenticate: Digest nonce="92956076410767313901322208775",opaque="1674186428",qop-options="auth",algorithm="MD5",realm="intarweb.us"\r
785 \r
786 """
787
788 authRequest = """\
789 REGISTER sip:intarweb.us SIP/2.0\r
790 Via: SIP/2.0/UDP 192.168.1.100:50609\r
791 From: <sip:exarkun@intarweb.us:50609>\r
792 To: <sip:exarkun@intarweb.us:50609>\r
793 Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r
794 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
795 CSeq: 9899 REGISTER\r
796 Expires: 500\r
797 Authorization: Digest username="exarkun",realm="intarweb.us",nonce="92956076410767313901322208775",response="4a47980eea31694f997369214292374b",uri="sip:intarweb.us",algorithm=MD5,opaque="1674186428"\r
798 User-Agent: X-Lite build 1061\r
799 Content-Length: 0\r
800 \r
801 """
802
803 okResponse = """\
804 SIP/2.0 200 OK\r
805 Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r
806 To: <sip:exarkun@intarweb.us:50609>\r
807 From: <sip:exarkun@intarweb.us:50609>\r
808 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
809 CSeq: 9899 REGISTER\r
810 Contact: sip:exarkun@127.0.0.1:5632\r
811 Expires: 3600\r
812 Content-Length: 0\r
813 \r
814 """
815
816 class FakeDigestAuthorizer(sip.DigestAuthorizer):
817     def generateNonce(self):
818         return '92956076410767313901322208775'
819     def generateOpaque(self):
820         return '1674186428'
821
822
823 class FakeRegistry(sip.InMemoryRegistry):
824     """Make sure expiration is always seen to be 3600.
825
826     Otherwise slow reactors fail tests incorrectly.
827     """
828
829     def _cbReg(self, reg):
830         if 3600 < reg.secondsToExpiry or reg.secondsToExpiry < 3598:
831             raise RuntimeError(
832                 "bad seconds to expire: %s" % reg.secondsToExpiry)
833         reg.secondsToExpiry = 3600
834         return reg
835
836     def getRegistrationInfo(self, uri):
837         d = sip.InMemoryRegistry.getRegistrationInfo(self, uri)
838         return d.addCallback(self._cbReg)
839
840     def registerAddress(self, domainURL, logicalURL, physicalURL):
841         d = sip.InMemoryRegistry.registerAddress(
842             self, domainURL, logicalURL, physicalURL)
843         return d.addCallback(self._cbReg)
844
845 class AuthorizationTestCase(unittest.TestCase):
846     def setUp(self):
847         self.proxy = sip.RegisterProxy(host="intarweb.us")
848         self.proxy.authorizers = self.proxy.authorizers.copy()
849         self.proxy.authorizers['digest'] = FakeDigestAuthorizer()
850
851         self.registry = FakeRegistry("intarweb.us")
852         self.proxy.registry = self.proxy.locator = self.registry
853         self.transport = proto_helpers.FakeDatagramTransport()
854         self.proxy.transport = self.transport
855
856         r = TestRealm()
857         p = cred.portal.Portal(r)
858         c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
859         c.addUser('exarkun@intarweb.us', 'password')
860         p.registerChecker(c)
861         self.proxy.portal = p
862     setUp = utils.suppressWarnings(setUp,
863         util.suppress(category=DeprecationWarning,
864             message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'))
865
866     def tearDown(self):
867         for d, uri in self.registry.users.values():
868             d.cancel()
869         del self.proxy
870
871     def testChallenge(self):
872         self.proxy.datagramReceived(registerRequest, ("127.0.0.1", 5632))
873
874         self.assertEqual(
875             self.transport.written[-1],
876             ((challengeResponse, ("127.0.0.1", 5632)))
877         )
878         self.transport.written = []
879
880         self.proxy.datagramReceived(authRequest, ("127.0.0.1", 5632))
881
882         self.assertEqual(
883             self.transport.written[-1],
884             ((okResponse, ("127.0.0.1", 5632)))
885         )
886     testChallenge.suppress = [
887         util.suppress(
888             category=DeprecationWarning,
889             message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'),
890         util.suppress(
891             category=DeprecationWarning,
892             message=r'twisted.protocols.sip.DigestedCredentials was deprecated'),
893         util.suppress(
894             category=DeprecationWarning,
895             message=r'twisted.protocols.sip.DigestCalcHA1 was deprecated'),
896         util.suppress(
897             category=DeprecationWarning,
898             message=r'twisted.protocols.sip.DigestCalcResponse was deprecated')]
899
900
901
902 class DeprecationTests(unittest.TestCase):
903     """
904     Tests for deprecation of obsolete components of L{twisted.protocols.sip}.
905     """
906
907     def test_deprecatedDigestCalcHA1(self):
908         """
909         L{sip.DigestCalcHA1} is deprecated.
910         """
911         self.callDeprecated(Version("Twisted", 9, 0, 0),
912                             sip.DigestCalcHA1, '', '', '', '', '', '')
913
914
915     def test_deprecatedDigestCalcResponse(self):
916         """
917         L{sip.DigestCalcResponse} is deprecated.
918         """
919         self.callDeprecated(Version("Twisted", 9, 0, 0),
920                             sip.DigestCalcResponse, '', '', '', '', '', '', '',
921                             '')
922
923     def test_deprecatedBasicAuthorizer(self):
924         """
925         L{sip.BasicAuthorizer} is deprecated.
926         """
927         self.callDeprecated(Version("Twisted", 9, 0, 0), sip.BasicAuthorizer)
928
929
930     def test_deprecatedDigestAuthorizer(self):
931         """
932         L{sip.DigestAuthorizer} is deprecated.
933         """
934         self.callDeprecated(Version("Twisted", 9, 0, 0), sip.DigestAuthorizer)
935
936
937     def test_deprecatedDigestedCredentials(self):
938         """
939         L{sip.DigestedCredentials} is deprecated.
940         """
941         self.callDeprecated(Version("Twisted", 9, 0, 0),
942                             sip.DigestedCredentials, '', {}, {})