"Initial commit to Gerrit"
[profile/ivi/gpsd.git] / gpsfake
1 #!/usr/bin/env python
2 #
3 # gpsfake -- test harness for gpsd
4 #
5 # Simulates a GPS, playing back a logfile
6 # Most of the logic for this now lives in gps.fake,
7 # factored out so we can write other test programs with it.
8 #
9 # This file is Copyright (c) 2010 by the GPSD project
10 # BSD terms apply: see the file COPYING in the distribution root for details.
11
12 import sys, os, signal, time, getopt, socket, random
13 import gps.fake
14
15 class Baton:
16     "Ship progress indications to stderr."
17     def __init__(self, prompt, endmsg=None):
18         self.stream = sys.stderr
19         self.stream.write(prompt + "...")
20         if os.isatty(self.stream.fileno()):
21             self.stream.write(" \010")
22         self.stream.flush()
23         self.count = 0
24         self.endmsg = endmsg
25         self.time = time.time()
26         return
27
28     def twirl(self, ch=None):
29         if self.stream is None:
30             return
31         if os.isatty(self.stream.fileno()):
32             if ch:
33                 self.stream.write(ch)
34             else:
35                 self.stream.write("-/|\\"[self.count % 4])
36                 self.stream.write("\010")
37             self.stream.flush()
38         self.count = self.count + 1
39         return
40
41     def end(self, msg=None):
42         if msg == None:
43             msg = self.endmsg
44         if self.stream:
45             self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg))
46         return
47
48 def fakeport():
49     "Find a port that isn't in use to bind to."
50     # Grab a random port from IANA's unassigned/private range. Not perfect,
51     # but at least less than in 16000 chance of a pair collision. It would
52     # be very hard to get this deterministically correct, since there would
53     # always be a window between port allocation and when the daemon picked
54     # it up.  We'd need to do some kind of semaphore, etc., and even that
55     # wouldn't prevent collisions with other apps using the range.
56     return random.randint(49152, 65535)
57
58 def hexdump(s):
59     rep = ""
60     for c in s:
61         rep += "%02x" % ord(c)
62     return rep
63
64 def fakehook(linenumber, fakegps):
65     if len(fakegps.testload.sentences) == 0:
66         print >>sys.stderr, "fakegps: no sentences in test load."
67         raise SystemExit, 1
68     if linenumber % len(fakegps.testload.sentences) == 0:
69         if singleshot and linenumber > 0:
70             return False
71         if progress:
72             baton.twirl('*\010')
73         elif not singleshot:
74             sys.stderr.write("gpsfake: log cycle of %s begins.\n" % fakegps.testload.name)
75     time.sleep(cycle)
76     if linedump and fakegps.testload.legend:
77         ml = fakegps.testload.sentences[linenumber % len(fakegps.testload.sentences)].strip()
78         if not fakegps.testload.textual:
79             ml = hexdump(ml)
80         announce = fakegps.testload.legend % (linenumber % len(fakegps.testload.sentences) + 1) + ml
81         if promptme:
82             raw_input(announce + "? ")
83         else:
84             print announce
85     if progress:
86         baton.twirl()
87     return True
88
89 if __name__ == '__main__':
90     try:
91         (options, arguments) = getopt.getopt(sys.argv[1:], "1bc:D:fghilm:no:pr:s:uvx")
92     except getopt.GetoptError, msg:
93         print "gpsfake: " + str(msg)
94         raise SystemExit, 1
95
96     if not arguments:
97         print >>sys.stderr, "gpsfake: requires at least one logfile argument."
98         raise SystemExit, 1
99
100     port = None
101     progress = False
102     cycle = 0
103     monitor = ""
104     speed = 4800
105     linedump = False
106     predump = False
107     pipe = False
108     singleshot = False
109     promptme = False
110     client_init = '?WATCH={"json":true,"nmea":true}'
111     doptions = ""
112     udp = False
113     verbose = 0
114     for (switch, val) in options:
115         if (switch == '-1'):
116             singleshot = True
117             port = fakeport()
118         elif (switch == '-b'):
119             progress = True
120         elif (switch == '-c'):
121             cycle = float(val)
122         elif (switch == '-D'):
123             doptions += " -D " + val
124         elif (switch == '-g'):
125             monitor = "xterm -e gdb -tui --args "
126         elif (switch == '-i'):
127             linedump = promptme = True
128         elif (switch == '-l'):
129             linedump = True
130         elif (switch == '-m'):
131             monitor = val + " "
132         elif (switch == '-n'):
133             doptions += " -n"
134         elif (switch == '-x'):
135             predump = True
136         elif (switch == '-o'):
137             doptions = val
138         elif (switch == '-p'):
139             pipe = True
140         elif (switch == '-r'):
141             client_init = val
142         elif (switch == '-s'):
143             speed = int(val)
144         elif (switch == '-u'):
145             udp = True
146         elif (switch == '-v'):
147             verbose += 1
148         elif (switch == '-h'):
149             sys.stderr.write("usage: gpsfake [-h] [-l] [-m monitor] [--D debug] [-o options] [-p] [-s speed] [-c cycle] [-b] logfile\n")
150             raise SystemExit,0
151
152     if progress:
153         baton = Baton("Processing %s" % ",".join(arguments), "done")
154     else:
155         print >>sys.stderr, "Processing %s" % ",".join(arguments)
156
157     test = gps.fake.TestSession(prefix=monitor, port=port, options=doptions, udp=udp, verbose=verbose, predump=predump)
158
159     if pipe:
160         test.reporter = sys.stdout.write
161         if verbose:
162             progress = False
163             test.progress = sys.stdout.write
164     test.spawn()
165     try:
166         for logfile in arguments:
167             try:
168                 test.gps_add(logfile, speed=speed, pred=fakehook)
169             except gps.fake.TestLoadError, e:
170                 sys.stderr.write("gpsfake: " + e.msg + "\n")
171                 raise SystemExit, 1
172             except gps.fake.PacketError, e:
173                 sys.stderr.write("gpsfake: " + e.msg + "\n")
174                 raise SystemExit, 1
175             except gps.fake.DaemonError, e:
176                 sys.stderr.write("gpsfake: " + e.msg + "\n")
177                 raise SystemExit, 1
178             except IOError, e:
179                 sys.stderr.write("gpsfake: no such file as %s or file unreadable\n"%e.filename)
180                 raise SystemExit, 1
181             except OSError:
182                 sys.stderr.write("gpsfake: can't open pty.\n")
183                 raise SystemExit, 1
184
185         try:
186             if pipe:
187                 test.client_add(client_init + "\n")
188                 # Give daemon time to get ready for the feeds.
189                 # Without a delay here there's a window for test
190                 # sentences to arrive before the watch takes effect.
191                 # This needs to increase if leading sentences in
192                 # test loads aren't being processed.
193                 time.sleep(1)
194             test.run()
195         except socket.error, msg:
196             sys.stderr.write("gpsfake: socket error %s.\n" % msg)
197             raise SystemExit, 1
198     finally:
199         test.cleanup();
200
201     if progress:
202         baton.end()
203
204 # The following sets edit modes for GNU EMACS
205 # Local Variables:
206 # mode:python
207 # End: