Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / test / test_nmea.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3  
4
5 """Test cases for the NMEA GPS protocol"""
6
7 import StringIO
8
9 from twisted.trial import unittest
10 from twisted.internet import reactor, protocol
11 from twisted.python import reflect
12
13 from twisted.protocols.gps import nmea
14
15 class StringIOWithNoClose(StringIO.StringIO):
16     def close(self):
17         pass
18
19 class ResultHarvester:
20     def __init__(self):
21         self.results = []
22
23     def __call__(self, *args):
24         self.results.append(args)
25
26     def performTest(self, function, *args, **kwargs):
27         l = len(self.results)
28         try:
29             function(*args, **kwargs)
30         except Exception, e:
31             self.results.append(e)
32         if l == len(self.results):
33             self.results.append(NotImplementedError())
34
35 class NMEATester(nmea.NMEAReceiver):
36     ignore_invalid_sentence = 0
37     ignore_checksum_mismatch = 0
38     ignore_unknown_sentencetypes = 0
39     convert_dates_before_y2k = 1
40
41     def connectionMade(self):
42         self.resultHarvester = ResultHarvester()
43         for fn in reflect.prefixedMethodNames(self.__class__, 'decode_'):
44             setattr(self, 'handle_' + fn, self.resultHarvester)
45         
46 class NMEAReceiverTestCase(unittest.TestCase):
47     messages = (
48         # fix - signal acquired
49         "$GPGGA,231713.0,3910.413,N,07641.994,W,1,05,1.35,00044,M,-033,M,,*69",
50         # fix - signal not acquired
51         "$GPGGA,235947.000,0000.0000,N,00000.0000,E,0,00,0.0,0.0,M,,,,0000*00",
52         # junk
53         "lkjasdfkl!@#(*$!@(*#(ASDkfjasdfLMASDCVKAW!@#($)!(@#)(*",
54         # fix - signal acquired (invalid checksum)
55         "$GPGGA,231713.0,3910.413,N,07641.994,W,1,05,1.35,00044,M,-033,M,,*68",
56         # invalid sentence
57         "$GPGGX,231713.0,3910.413,N,07641.994,W,1,05,1.35,00044,M,-033,M,,*68",
58         # position acquired
59         "$GPGLL,4250.5589,S,14718.5084,E,092204.999,A*2D",
60         # position not acquired
61         "$GPGLL,0000.0000,N,00000.0000,E,235947.000,V*2D",
62         # active satellites (no fix)
63         "$GPGSA,A,1,,,,,,,,,,,,,0.0,0.0,0.0*30",
64         # active satellites
65         "$GPGSA,A,3,01,20,19,13,,,,,,,,,40.4,24.4,32.2*0A",
66         # positiontime (no fix)
67         "$GPRMC,235947.000,V,0000.0000,N,00000.0000,E,,,041299,,*1D",
68         # positiontime
69         "$GPRMC,092204.999,A,4250.5589,S,14718.5084,E,0.00,89.68,211200,,*25",
70         # course over ground (no fix - not implemented)
71         "$GPVTG,,T,,M,,N,,K*4E",
72         # course over ground (not implemented)
73         "$GPVTG,89.68,T,,M,0.00,N,0.0,K*5F",
74     )
75     results = (
76         (83833.0, 39.17355, -76.6999, nmea.POSFIX_SPS, 5, 1.35, (44.0, 'M'), (-33.0, 'M'), None),
77         (86387.0, 0.0, 0.0, 0, 0, 0.0, (0.0, 'M'), None, None),
78         nmea.InvalidSentence(),
79         nmea.InvalidChecksum(),
80         nmea.InvalidSentence(),
81         (-42.842648333333337, 147.30847333333332, 33724.999000000003, 1),
82         (0.0, 0.0, 86387.0, 0),
83         ((None, None, None, None, None, None, None, None, None, None, None, None), (nmea.MODE_AUTO, nmea.MODE_NOFIX), 0.0, 0.0, 0.0),
84         ((1, 20, 19, 13, None, None, None, None, None, None, None, None), (nmea.MODE_AUTO, nmea.MODE_3D), 40.4, 24.4, 32.2),
85         (0.0, 0.0, None, None, 86387.0, (1999, 12, 4), None),
86         (-42.842648333333337, 147.30847333333332, 0.0, 89.68, 33724.999, (2000, 12, 21), None),
87         NotImplementedError(),
88         NotImplementedError(),
89     )
90     def testGPSMessages(self):
91         dummy = NMEATester()
92         dummy.makeConnection(protocol.FileWrapper(StringIOWithNoClose()))
93         for line in self.messages:
94             dummy.resultHarvester.performTest(dummy.lineReceived, line) 
95         def munge(myTuple):
96             if type(myTuple) != type(()):
97                 return
98             newTuple = []
99             for v in myTuple:
100                 if type(v) == type(1.1):
101                     v = float(int(v * 10000.0)) * 0.0001
102                 newTuple.append(v)
103             return tuple(newTuple)
104         for (message, expectedResult, actualResult) in zip(self.messages, self.results, dummy.resultHarvester.results):
105             expectedResult = munge(expectedResult)
106             actualResult = munge(actualResult)
107             if isinstance(expectedResult, Exception):
108                 if isinstance(actualResult, Exception):
109                     self.assertEqual(expectedResult.__class__, actualResult.__class__, "\nInput:\n%s\nExpected:\n%s.%s\nResults:\n%s.%s\n" % (message, expectedResult.__class__.__module__, expectedResult.__class__.__name__, actualResult.__class__.__module__, actualResult.__class__.__name__))
110                 else:
111                     self.assertEqual(1, 0, "\nInput:\n%s\nExpected:\n%s.%s\nResults:\n%r\n" % (message, expectedResult.__class__.__module__, expectedResult.__class__.__name__, actualResult))
112             else:
113               self.assertEqual(expectedResult, actualResult, "\nInput:\n%s\nExpected: %r\nResults: %r\n" % (message, expectedResult, actualResult))
114
115 testCases = [NMEAReceiverTestCase]