Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / words / test / test_msn.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Test cases for L{twisted.words.protocols.msn}.
6 """
7
8 # System imports
9 import StringIO
10
11 # Twisted imports
12
13 # t.w.p.msn requires an HTTP client
14 try:
15     # So try to get one - do it directly instead of catching an ImportError
16     # from t.w.p.msn so that other problems which cause that module to fail
17     # to import don't cause the tests to be skipped.
18     from twisted.web import client
19 except ImportError:
20     # If there isn't one, we're going to skip all the tests.
21     msn = None
22 else:
23     # Otherwise importing it should work, so do it.
24     from twisted.words.protocols import msn
25
26
27 from twisted.python.hashlib import md5
28 from twisted.protocols import loopback
29 from twisted.internet.defer import Deferred
30 from twisted.trial import unittest
31 from twisted.test.proto_helpers import StringTransport, StringIOWithoutClosing
32
33 def printError(f):
34     print f
35
36
37 class PassportTests(unittest.TestCase):
38
39     def setUp(self):
40         self.result = []
41         self.deferred = Deferred()
42         self.deferred.addCallback(lambda r: self.result.append(r))
43         self.deferred.addErrback(printError)
44
45     def test_nexus(self):
46         """
47         When L{msn.PassportNexus} receives enough information to identify the
48         address of the login server, it fires the L{Deferred} passed to its
49         initializer with that address.
50         """
51         protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
52         headers = {
53             'Content-Length' : '0',
54             'Content-Type'   : 'text/html',
55             'PassportURLs'   : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
56         }
57         transport = StringTransport()
58         protocol.makeConnection(transport)
59         protocol.dataReceived('HTTP/1.0 200 OK\r\n')
60         for (h, v) in headers.items():
61             protocol.dataReceived('%s: %s\r\n' % (h,v))
62         protocol.dataReceived('\r\n')
63         self.assertEqual(self.result[0], "https://login.myserver.com/")
64
65
66     def _doLoginTest(self, response, headers):
67         protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
68         protocol.makeConnection(StringTransport())
69         protocol.dataReceived(response)
70         for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
71         protocol.dataReceived('\r\n')
72
73     def testPassportLoginSuccess(self):
74         headers = {
75             'Content-Length'      : '0',
76             'Content-Type'        : 'text/html',
77             'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
78                                     "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
79                                     "ru=http://messenger.msn.com"
80         }
81         self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
82         self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
83
84     def testPassportLoginFailure(self):
85         headers = {
86             'Content-Type'     : 'text/html',
87             'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
88                                  'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
89                                  'cbtxt=the%20error%20message'
90         }
91         self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
92         self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
93
94     def testPassportLoginRedirect(self):
95         headers = {
96             'Content-Type'        : 'text/html',
97             'Authentication-Info' : 'Passport1.4 da-status=redir',
98             'Location'            : 'https://newlogin.host.com/'
99         }
100         self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
101         self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
102
103
104 if msn is not None:
105     class DummySwitchboardClient(msn.SwitchboardClient):
106         def userTyping(self, message):
107             self.state = 'TYPING'
108
109         def gotSendRequest(self, fileName, fileSize, cookie, message):
110             if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
111
112
113     class DummyNotificationClient(msn.NotificationClient):
114         def loggedIn(self, userHandle, screenName, verified):
115             if userHandle == 'foo@bar.com' and screenName == 'Test Screen Name' and verified:
116                 self.state = 'LOGIN'
117
118         def gotProfile(self, message):
119             self.state = 'PROFILE'
120
121         def gotContactStatus(self, code, userHandle, screenName):
122             if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
123                 self.state = 'INITSTATUS'
124
125         def contactStatusChanged(self, code, userHandle, screenName):
126             if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
127                 self.state = 'NEWSTATUS'
128
129         def contactOffline(self, userHandle):
130             if userHandle == "foo@bar.com": self.state = 'OFFLINE'
131
132         def statusChanged(self, code):
133             if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
134
135         def listSynchronized(self, *args):
136             self.state = 'GOTLIST'
137
138         def gotPhoneNumber(self, listVersion, userHandle, phoneType, number):
139             msn.NotificationClient.gotPhoneNumber(self, listVersion, userHandle, phoneType, number)
140             self.state = 'GOTPHONE'
141
142         def userRemovedMe(self, userHandle, listVersion):
143             msn.NotificationClient.userRemovedMe(self, userHandle, listVersion)
144             c = self.factory.contacts.getContact(userHandle)
145             if not c and self.factory.contacts.version == listVersion: self.state = 'USERREMOVEDME'
146
147         def userAddedMe(self, userHandle, screenName, listVersion):
148             msn.NotificationClient.userAddedMe(self, userHandle, screenName, listVersion)
149             c = self.factory.contacts.getContact(userHandle)
150             if c and (c.lists | msn.REVERSE_LIST) and (self.factory.contacts.version == listVersion) and \
151                (screenName == 'Screen Name'):
152                 self.state = 'USERADDEDME'
153
154         def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
155             if sessionID == 1234 and \
156                host == '192.168.1.1' and \
157                port == 1863 and \
158                key == '123.456' and \
159                userHandle == 'foo@foo.com' and \
160                screenName == 'Screen Name':
161                 self.state = 'SBINVITED'
162
163
164
165 class DispatchTests(unittest.TestCase):
166     """
167     Tests for L{DispatchClient}.
168     """
169     def _versionTest(self, serverVersionResponse):
170         """
171         Test L{DispatchClient} version negotiation.
172         """
173         client = msn.DispatchClient()
174         client.userHandle = "foo"
175
176         transport = StringTransport()
177         client.makeConnection(transport)
178         self.assertEqual(
179             transport.value(), "VER 1 MSNP8 CVR0\r\n")
180         transport.clear()
181
182         client.dataReceived(serverVersionResponse)
183         self.assertEqual(
184             transport.value(),
185             "CVR 2 0x0409 win 4.10 i386 MSNMSGR 5.0.0544 MSMSGS foo\r\n")
186
187
188     def test_version(self):
189         """
190         L{DispatchClient.connectionMade} greets the server with a I{VER}
191         (version) message and then L{NotificationClient.dataReceived}
192         handles the server's I{VER} response by sending a I{CVR} (client
193         version) message.
194         """
195         self._versionTest("VER 1 MSNP8 CVR0\r\n")
196
197
198     def test_versionWithoutCVR0(self):
199         """
200         If the server responds to a I{VER} command without including the
201         I{CVR0} protocol, L{DispatchClient} behaves in the same way as if
202         that protocol were included.
203
204         Starting in August 2008, CVR0 disappeared from the I{VER} response.
205         """
206         self._versionTest("VER 1 MSNP8\r\n")
207
208
209
210 class NotificationTests(unittest.TestCase):
211     """ testing the various events in NotificationClient """
212
213     def setUp(self):
214         self.client = DummyNotificationClient()
215         self.client.factory = msn.NotificationFactory()
216         self.client.state = 'START'
217
218
219     def tearDown(self):
220         self.client = None
221
222
223     def _versionTest(self, serverVersionResponse):
224         """
225         Test L{NotificationClient} version negotiation.
226         """
227         self.client.factory.userHandle = "foo"
228
229         transport = StringTransport()
230         self.client.makeConnection(transport)
231         self.assertEqual(
232             transport.value(), "VER 1 MSNP8 CVR0\r\n")
233         transport.clear()
234
235         self.client.dataReceived(serverVersionResponse)
236         self.assertEqual(
237             transport.value(),
238             "CVR 2 0x0409 win 4.10 i386 MSNMSGR 5.0.0544 MSMSGS foo\r\n")
239
240
241     def test_version(self):
242         """
243         L{NotificationClient.connectionMade} greets the server with a I{VER}
244         (version) message and then L{NotificationClient.dataReceived}
245         handles the server's I{VER} response by sending a I{CVR} (client
246         version) message.
247         """
248         self._versionTest("VER 1 MSNP8 CVR0\r\n")
249
250
251     def test_versionWithoutCVR0(self):
252         """
253         If the server responds to a I{VER} command without including the
254         I{CVR0} protocol, L{NotificationClient} behaves in the same way as
255         if that protocol were included.
256
257         Starting in August 2008, CVR0 disappeared from the I{VER} response.
258         """
259         self._versionTest("VER 1 MSNP8\r\n")
260
261
262     def test_challenge(self):
263         """
264         L{NotificationClient} responds to a I{CHL} message by sending a I{QRY}
265         back which included a hash based on the parameters of the I{CHL}.
266         """
267         transport = StringTransport()
268         self.client.makeConnection(transport)
269         transport.clear()
270
271         challenge = "15570131571988941333"
272         self.client.dataReceived('CHL 0 ' + challenge + '\r\n')
273         # md5 of the challenge and a magic string defined by the protocol
274         response = "8f2f5a91b72102cd28355e9fc9000d6e"
275         # Sanity check - the response is what the comment above says it is.
276         self.assertEqual(
277             response, md5(challenge + "Q1P7W2E4J9R8U3S5").hexdigest())
278         self.assertEqual(
279             transport.value(),
280             # 2 is the next transaction identifier.  32 is the length of the
281             # response.
282             "QRY 2 msmsgs@msnmsgr.com 32\r\n" + response)
283
284
285     def testLogin(self):
286         self.client.lineReceived('USR 1 OK foo@bar.com Test%20Screen%20Name 1 0')
287         self.failUnless((self.client.state == 'LOGIN'), msg='Failed to detect successful login')
288
289
290     def test_loginWithoutSSLFailure(self):
291         """
292         L{NotificationClient.loginFailure} is called if the necessary SSL APIs
293         are unavailable.
294         """
295         self.patch(msn, 'ClientContextFactory', None)
296         success = []
297         self.client.loggedIn = lambda *args: success.append(args)
298         failure = []
299         self.client.loginFailure = failure.append
300
301         self.client.lineReceived('USR 6 TWN S opaque-string-goes-here')
302         self.assertEqual(success, [])
303         self.assertEqual(
304             failure,
305             ["Exception while authenticating: "
306              "Connecting to the Passport server requires SSL, but SSL is "
307              "unavailable."])
308
309
310     def testProfile(self):
311         m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
312         m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
313         m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
314         m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
315         map(self.client.lineReceived, m.split('\r\n')[:-1])
316         self.failUnless((self.client.state == 'PROFILE'), msg='Failed to detect initial profile')
317
318     def testStatus(self):
319         t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 0', 'INITSTATUS', 'Failed to detect initial status report'),
320              ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
321              ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
322              ('CHG 1 HDN 0', 'MYSTATUS', 'Failed to detect my status changing')]
323         for i in t:
324             self.client.lineReceived(i[0])
325             self.failUnless((self.client.state == i[1]), msg=i[2])
326
327     def testListSync(self):
328         # currently this test does not take into account the fact
329         # that BPRs sent as part of the SYN reply may not be interpreted
330         # as such if they are for the last LST -- maybe I should
331         # factor this in later.
332         self.client.makeConnection(StringTransport())
333         msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foobar', 1)
334         lines = [
335             "SYN %s 100 1 1" % self.client.currentID,
336             "GTC A",
337             "BLP AL",
338             "LSG 0 Other%20Contacts 0",
339             "LST userHandle@email.com Some%20Name 11 0"
340         ]
341         map(self.client.lineReceived, lines)
342         contacts = self.client.factory.contacts
343         contact = contacts.getContact('userHandle@email.com')
344         self.failUnless(contacts.version == 100, "Invalid contact list version")
345         self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
346         self.failUnless(contacts.groups == {0 : 'Other Contacts'}, "Did not get proper group list")
347         self.failUnless(contact.groups == [0] and contact.lists == 11, "Invalid contact list/group info")
348         self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
349
350     def testAsyncPhoneChange(self):
351         c = msn.MSNContact(userHandle='userHandle@email.com')
352         self.client.factory.contacts = msn.MSNContactList()
353         self.client.factory.contacts.addContact(c)
354         self.client.makeConnection(StringTransport())
355         self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
356         c = self.client.factory.contacts.getContact('userHandle@email.com')
357         self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
358         self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
359         self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
360
361     def testLateBPR(self):
362         """
363         This test makes sure that if a BPR response that was meant
364         to be part of a SYN response (but came after the last LST)
365         is received, the correct contact is updated and all is well
366         """
367         self.client.makeConnection(StringTransport())
368         msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foo', 1)
369         lines = [
370             "SYN %s 100 1 1" % self.client.currentID,
371             "GTC A",
372             "BLP AL",
373             "LSG 0 Other%20Contacts 0",
374             "LST userHandle@email.com Some%20Name 11 0",
375             "BPR PHH 123%20456"
376         ]
377         map(self.client.lineReceived, lines)
378         contact = self.client.factory.contacts.getContact('userHandle@email.com')
379         self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
380
381     def testUserRemovedMe(self):
382         self.client.factory.contacts = msn.MSNContactList()
383         contact = msn.MSNContact(userHandle='foo@foo.com')
384         contact.addToList(msn.REVERSE_LIST)
385         self.client.factory.contacts.addContact(contact)
386         self.client.lineReceived("REM 0 RL 100 foo@foo.com")
387         self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
388
389     def testUserAddedMe(self):
390         self.client.factory.contacts = msn.MSNContactList()
391         self.client.lineReceived("ADD 0 RL 100 foo@foo.com Screen%20Name")
392         self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
393
394     def testAsyncSwitchboardInvitation(self):
395         self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
396         self.failUnless(self.client.state == "SBINVITED")
397
398     def testCommandFailed(self):
399         """
400         Ensures that error responses from the server fires an errback with
401         MSNCommandFailed.
402         """
403         id, d = self.client._createIDMapping()
404         self.client.lineReceived("201 %s" % id)
405         d = self.assertFailure(d, msn.MSNCommandFailed)
406         def assertErrorCode(exception):
407             self.assertEqual(201, exception.errorCode)
408         return d.addCallback(assertErrorCode)
409
410
411 class MessageHandlingTests(unittest.TestCase):
412     """ testing various message handling methods from SwichboardClient """
413
414     def setUp(self):
415         self.client = DummySwitchboardClient()
416         self.client.state = 'START'
417
418     def tearDown(self):
419         self.client = None
420
421     def testClientCapabilitiesCheck(self):
422         m = msn.MSNMessage()
423         m.setHeader('Content-Type', 'text/x-clientcaps')
424         self.assertEqual(self.client.checkMessage(m), 0, 'Failed to detect client capability message')
425
426     def testTypingCheck(self):
427         m = msn.MSNMessage()
428         m.setHeader('Content-Type', 'text/x-msmsgscontrol')
429         m.setHeader('TypingUser', 'foo@bar')
430         self.client.checkMessage(m)
431         self.failUnless((self.client.state == 'TYPING'), msg='Failed to detect typing notification')
432
433     def testFileInvitation(self, lazyClient=False):
434         m = msn.MSNMessage()
435         m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
436         m.message += 'Application-Name: File Transfer\r\n'
437         if not lazyClient:
438             m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
439         m.message += 'Invitation-Command: Invite\r\n'
440         m.message += 'Invitation-Cookie: 1234\r\n'
441         m.message += 'Application-File: foobar.ext\r\n'
442         m.message += 'Application-FileSize: 31337\r\n\r\n'
443         self.client.checkMessage(m)
444         self.failUnless((self.client.state == 'INVITATION'), msg='Failed to detect file transfer invitation')
445
446     def testFileInvitationMissingGUID(self):
447         return self.testFileInvitation(True)
448
449     def testFileResponse(self):
450         d = Deferred()
451         d.addCallback(self.fileResponse)
452         self.client.cookies['iCookies'][1234] = (d, None)
453         m = msn.MSNMessage()
454         m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
455         m.message += 'Invitation-Command: ACCEPT\r\n'
456         m.message += 'Invitation-Cookie: 1234\r\n\r\n'
457         self.client.checkMessage(m)
458         self.failUnless((self.client.state == 'RESPONSE'), msg='Failed to detect file transfer response')
459
460     def testFileInfo(self):
461         d = Deferred()
462         d.addCallback(self.fileInfo)
463         self.client.cookies['external'][1234] = (d, None)
464         m = msn.MSNMessage()
465         m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
466         m.message += 'Invitation-Command: ACCEPT\r\n'
467         m.message += 'Invitation-Cookie: 1234\r\n'
468         m.message += 'IP-Address: 192.168.0.1\r\n'
469         m.message += 'Port: 6891\r\n'
470         m.message += 'AuthCookie: 4321\r\n\r\n'
471         self.client.checkMessage(m)
472         self.failUnless((self.client.state == 'INFO'), msg='Failed to detect file transfer info')
473
474     def fileResponse(self, (accept, cookie, info)):
475         if accept and cookie == 1234: self.client.state = 'RESPONSE'
476
477     def fileInfo(self, (accept, ip, port, aCookie, info)):
478         if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
479
480
481 class FileTransferTestCase(unittest.TestCase):
482     """
483     test FileSend against FileReceive
484     """
485
486     def setUp(self):
487         self.input = 'a' * 7000
488         self.output = StringIOWithoutClosing()
489
490
491     def tearDown(self):
492         self.input = None
493         self.output = None
494
495
496     def test_fileTransfer(self):
497         """
498         Test L{FileSend} against L{FileReceive} using a loopback transport.
499         """
500         auth = 1234
501         sender = msn.FileSend(StringIO.StringIO(self.input))
502         sender.auth = auth
503         sender.fileSize = 7000
504         client = msn.FileReceive(auth, "foo@bar.com", self.output)
505         client.fileSize = 7000
506         def check(ignored):
507             self.assertTrue(
508                 client.completed and sender.completed,
509                 msg="send failed to complete")
510             self.assertEqual(
511                 self.input, self.output.getvalue(),
512                 msg="saved file does not match original")
513         d = loopback.loopbackAsync(sender, client)
514         d.addCallback(check)
515         return d
516
517 if msn is None:
518     for testClass in [DispatchTests, PassportTests, NotificationTests,
519                       MessageHandlingTests, FileTransferTestCase]:
520         testClass.skip = (
521             "MSN requires an HTTP client but none is available, "
522             "skipping tests.")