"Initial commit to Gerrit"
[profile/ivi/gpsd.git] / gps / fake.py
1 # This file is Copyright (c) 2010 by the GPSD project
2 # BSD terms apply: see the file COPYING in the distribution root for details.
3 """
4 gpsfake.py -- classes for creating a controlled test environment around gpsd.
5
6 The gpsfake(1) regression tester shipped with gpsd is a trivial wrapper
7 around this code.  For a more interesting usage example, see the
8 valgrind-audit script shipped with the gpsd code.
9
10 To use this code, start by instantiating a TestSession class.  Use the
11 prefix argument if you want to run the daemon under some kind of run-time
12 monitor like valgrind or gdb.  Here are some particularly useful possibilities:
13
14 valgrind --tool=memcheck --gen-suppressions=yes --leak-check=yes
15     Run under Valgrind, checking for malloc errors and memory leaks.
16
17 xterm -e gdb -tui --args
18     Run under gdb, controlled from a new xterm.
19
20 You can use the options argument to pass in daemon options; normally you will
21 use this to set the debug-logging level.
22
23 On initialization, the test object spawns an instance of gpsd with no
24 devices or clients attached, connected to a control socket.
25
26 TestSession has methods to attach and detch fake GPSes. The
27 TestSession class simulates GPS devices for you with objects composed
28 from a pty and a class instance that cycles sentences into the master side
29 from some specified logfile; gpsd reads the slave side.  A fake GPS is
30 identified by the string naming its slave device.
31
32 TestSession also has methods to start and end client sessions.  Daemon
33 responses to a client are fed to a hook function which, by default,
34 discards them.  You can change the hook to sys.stdout.write() to dump
35 responses to standard output (this is what the gpsfake executable
36 does) or do something more exotic. A client session is identified by a
37 small integer that counts the number of client session starts.
38
39 There are a couple of convenience methods.  TestSession.wait() does nothing,
40 allowing a specified number of seconds to elapse.  TestSession.send()
41 ships commands to an open client session.
42
43 TestSession does not currently capture the daemon's log output.  It is
44 run with -N, so the output will go to stderr (along with, for example,
45 Valgrind notifications).
46
47 Each FakeGPS instance tries to packetize the data from the logfile it
48 is initialized with. It uses the same packet-getter as the daeomon.
49
50 The TestSession code maintains a run queue of FakeGPS and gps.gs (client-
51 session) objects. It repeatedly cycles through the run queue.  For each
52 client session object in the queue, it tries to read data from gpsd.  For
53 each fake GPS, it sends one line of stored data.  When a fake-GPS's
54 go predicate becomes false, the fake GPS is removed from the run queue.
55
56 There are two ways to use this code.  The more deterministic is
57 non-threaded mode: set up your client sessions and fake GPS devices,
58 then call the run() method.  The run() method will terminate when
59 there are no more objects in the run queue.  Note, you must have
60 created at least one fake client or fake GPS before calling run(),
61 otherwise it will terminate immediately.
62
63 To allow for adding and removing clients while the test is running,
64 run in threaded mode by calling the start() method.  This simply calls
65 the run method in a subthread, with locking of critical regions.
66 """
67 import sys, os, time, signal, pty, termios # fcntl, array, struct
68 import exceptions, threading, socket
69 import gps
70 import packet as sniffer
71
72 # The two magic numbers below have to be derived from observation.  If
73 # they're too high you'll slow the tests down a lot.  If they're too low
74 # you'll get random spurious regression failures that usually look
75 # like lines missing from the end of the test output relative to the
76 # check file.  These numbers might have to be adjusted upward on faster
77 # machines.  The need for them may be symnptomatic of race conditions
78 # in the pty layer or elsewhere.
79
80 # Define a per-line delay on writes so we won't spam the buffers in
81 # the pty layer or gpsd itself.  Removing this entirely was tried but
82 # caused failures under NetBSD.  Values smaller than the system timer
83 # tick don't make any difference here.
84 WRITE_PAD = 0.001
85
86 # We delay briefly after a GPS source is exhausted before removing it.
87 # This should give its subscribers time to get gpsd's response before
88 # we call the cleanup code. Note that using fractional seconds in
89 # CLOSE_DELAY may have no effect; Python time.time() returns a float
90 # value, but it is not guaranteed by Python that the C implementation
91 # underneath will return with precision finer than 1 second. (Linux
92 # and *BSD return full precision.)
93 CLOSE_DELAY = 1
94
95 class TestLoadError(exceptions.Exception):
96     def __init__(self, msg):
97         self.msg = msg
98
99 class TestLoad:
100     "Digest a logfile into a list of sentences we can cycle through."
101     def __init__(self, logfp, predump=False):
102         self.sentences = []     # This is the interesting part
103         if type(logfp) == type(""):
104             logfp = open(logfp, "r");            
105         self.name = logfp.name
106         self.logfp = logfp
107         self.predump = predump
108         self.logfile = logfp.name
109         self.type = None
110         self.sourcetype = "pty"
111         self.serial = None
112         # Grab the packets
113         getter = sniffer.new()
114         #gps.packet.register_report(reporter)
115         type_latch = None
116         while True:
117             (len, ptype, packet) = getter.get(logfp.fileno())
118             if len <= 0:
119                 break
120             elif ptype == sniffer.COMMENT_PACKET:
121                 # Some comments are magic
122                 if "Serial:" in packet:
123                     # Change serial parameters
124                     packet = packet[1:].strip()
125                     try:
126                         (xx, baud, params) = packet.split()
127                         baud = int(baud)
128                         if params[0] in ('7', '8'):
129                             databits = int(params[0])
130                         else:
131                             raise ValueError
132                         if params[1] in ('N', 'O', 'E'):
133                             parity = params[1]
134                         else:
135                             raise ValueError
136                         if params[2] in ('1', '2'):
137                             stopbits = int(params[2])
138                         else:
139                             raise ValueError
140                     except (ValueError, IndexError):
141                         raise TestLoadError("bad serial-parameter spec in %s"%\
142                                             logfp.name)                    
143                     self.serial = (baud, databits, parity, stopbits)
144                 elif "UDP" in packet:
145                     self.sourcetype = "UDP"
146                 elif "%" in packet:
147                     # Pass through for later interpretation 
148                     self.sentences.append(packet)
149             else:
150                 if type_latch is None:
151                     type_latch = ptype
152                 if self.predump:
153                     print `packet`
154                 if not packet:
155                     raise TestLoadError("zero-length packet from %s"%\
156                                         logfp.name)                    
157                 self.sentences.append(packet)
158         # Look at the first packet to grok the GPS type
159         self.textual = (type_latch == sniffer.NMEA_PACKET)
160         if self.textual:
161             self.legend = "gpsfake: line %d: "
162         else:
163             self.legend = "gpsfake: packet %d"
164
165 class PacketError(exceptions.Exception):
166     def __init__(self, msg):
167         self.msg = msg
168
169 class FakeGPS:
170     def __init__(self, testload, progress=None):
171         self.testload = testload
172         self.progress = progress
173         self.go_predicate = lambda: True
174         self.readers = 0
175         self.index = 0
176         self.progress("gpsfake: %s provides %d sentences\n" % (self.testload.name, len(self.testload.sentences)))
177
178     def feed(self):
179         "Feed a line from the contents of the GPS log to the daemon."
180         line = self.testload.sentences[self.index % len(self.testload.sentences)]
181         if "%Delay:" in line:
182             # Delay specified number of seconds
183             delay = line.split()[1]
184             time.sleep(int(delay))
185         # self.write has to be set by the derived class
186         self.write(line)
187         if self.progress:
188             self.progress("gpsfake: %s feeds %d=%s\n" % (self.testload.name, len(line), `line`))
189         time.sleep(WRITE_PAD)
190         self.index += 1
191
192 class FakePTY(FakeGPS):
193     "A FakePTY is a pty with a test log ready to be cycled to it."
194     def __init__(self, testload,
195                  speed=4800, databits=8, parity='N', stopbits=1,
196                  progress=None):
197         FakeGPS.__init__(self, testload, progress)
198         # Allow Serial: header to be overridden by explicit spped.
199         if self.testload.serial:
200             (speed, databits, parity, stopbits) = self.testload.serial
201         self.speed = speed
202         baudrates = {
203             0: termios.B0,
204             50: termios.B50,
205             75: termios.B75,
206             110: termios.B110,
207             134: termios.B134,
208             150: termios.B150,
209             200: termios.B200,
210             300: termios.B300,
211             600: termios.B600,
212             1200: termios.B1200,
213             1800: termios.B1800,
214             2400: termios.B2400,
215             4800: termios.B4800,
216             9600: termios.B9600,
217             19200: termios.B19200,
218             38400: termios.B38400,
219             57600: termios.B57600,
220             115200: termios.B115200,
221             230400: termios.B230400,
222         }
223         speed = baudrates[speed]        # Throw an error if the speed isn't legal
224         (self.fd, self.slave_fd) = pty.openpty()
225         self.byname = os.ttyname(self.slave_fd)
226         (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(self.slave_fd)
227         cc[termios.VMIN] = 1
228         cflag &= ~(termios.PARENB | termios.PARODD | termios.CRTSCTS)
229         cflag |= termios.CREAD | termios.CLOCAL
230         iflag = oflag = lflag = 0
231         iflag &=~ (termios.PARMRK | termios.INPCK)
232         cflag &=~ (termios.CSIZE | termios.CSTOPB | termios.PARENB | termios.PARODD)
233         if databits == 7:
234             cflag |= termios.CS7
235         else:
236             cflag |= termios.CS8
237         if stopbits == 2:
238             cflag |= termios.CSTOPB
239         if parity == 'E':
240             iflag |= termios.INPCK
241             cflag |= termios.PARENB
242         elif parity == 'O':
243             iflag |= termios.INPCK
244             cflag |= termios.PARENB | termios.PARODD
245         ispeed = ospeed = speed
246         termios.tcsetattr(self.slave_fd, termios.TCSANOW,
247                           [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
248     def read(self):
249         "Discard control strings written by gpsd."
250         # A tcflush implementation works on Linux but fails on OpenBSD 4.
251         termios.tcflush(self.fd, termios.TCIFLUSH)
252         # Alas, the FIONREAD version also works on Linux and fails on OpenBSD.
253         #try:
254         #    buf = array.array('i', [0])
255         #    fcntl.ioctl(self.master_fd, termios.FIONREAD, buf, True)
256         #    n = struct.unpack('i', buf)[0]
257         #    os.read(self.master_fd, n)
258         #except IOError:
259         #    pass
260
261     def write(self, line):
262         os.write(self.fd, line)
263
264     def drain(self):
265         "Wait for the associated device to drain (e.g. before closing)."
266         termios.tcdrain(self.fd)
267
268 class FakeUDP(FakeGPS):
269     "A UDP broadcaster with a test log ready to be cycled to it."
270     def __init__(self, testload,
271                  ipaddr, port,
272                  progress=None):
273         FakeGPS.__init__(self, testload, progress)
274         self.ipaddr = ipaddr
275         self.port = port
276         self.byname = "udp://" + ipaddr + ":" + port
277         self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
278
279     def read(self):
280         "Discard control strings written by gpsd."
281         pass
282
283     def write(self, line):
284         self.sock.sendto(line, (self.ipaddr, int(self.port)))
285
286     def drain(self):
287         "Wait for the associated device to drain (e.g. before closing)."
288         pass    # shutdown() fails on UDP
289
290 class DaemonError(exceptions.Exception):
291     def __init__(self, msg):
292         self.msg = msg
293     def __str__(self):
294         return repr(self.msg)
295
296 class DaemonInstance:
297     "Control a gpsd instance."
298     def __init__(self, control_socket=None):
299         self.sockfile = None
300         self.pid = None
301         if control_socket:
302             self.control_socket = control_socket
303         else:
304             self.control_socket = "/tmp/gpsfake-%d.sock" % os.getpid()
305         self.pidfile  = "/tmp/gpsfake_pid-%s" % os.getpid()
306     def spawn(self, options, port, background=False, prefix=""):
307         "Spawn a daemon instance."
308         self.spawncmd = None
309
310         # Look for gpsd in GPSD_HOME env variable
311         if os.environ.get('GPSD_HOME'):
312             for path in os.environ['GPSD_HOME'].split(':'):
313                 _spawncmd = "%s/gpsd" % path
314                 if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK):
315                     self.spawncmd = _spawncmd
316                     break
317
318         # if we could not find it yet try PATH env variable for it
319         if not self.spawncmd:
320             if not '/usr/sbin' in os.environ['PATH']:
321                 os.environ['PATH']=os.environ['PATH'] + ":/usr/sbin"
322             for path in os.environ['PATH'].split(':'):
323                 _spawncmd = "%s/gpsd" % path
324                 if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK):
325                     self.spawncmd = _spawncmd
326                     break
327
328         if not self.spawncmd:
329             raise DaemonError("Cannot execute gpsd: executable not found. Set GPSD_HOME env variable")
330         # The -b option to suppress hanging on probe returns is needed to cope
331         # with OpenBSD (and possibly other non-Linux systems) that don't support
332         # anything we can use to implement the FakeGPS.read() method
333         self.spawncmd += " -b -N -S %s -F %s -P %s %s" % (port, self.control_socket, self.pidfile, options)
334         if prefix:
335             self.spawncmd = prefix + " " + self.spawncmd.strip()
336         if background:
337             self.spawncmd += " &"
338         status = os.system(self.spawncmd)
339         if os.WIFSIGNALED(status) or os.WEXITSTATUS(status):
340             raise DaemonError("daemon exited with status %d" % status)
341     def wait_pid(self):
342         "Wait for the daemon, get its PID and a control-socket connection."
343         while True:
344             try:
345                 fp = open(self.pidfile)
346             except IOError:
347                 time.sleep(0.1)
348                 continue
349             try:
350                 fp.seek(0)
351                 pidstr = fp.read()
352                 self.pid = int(pidstr)
353             except ValueError:
354                 time.sleep(0.5)
355                 continue        # Avoid race condition -- PID not yet written
356             fp.close()
357             break
358     def __get_control_socket(self):
359         # Now we know it's running, get a connection to the control socket.
360         if not os.path.exists(self.control_socket):
361             return None
362         try:
363             self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
364             self.sock.connect(self.control_socket)
365         except socket.error:
366             if self.sock:
367                 self.sock.close()
368             self.sock = None
369         return self.sock
370     def is_alive(self):
371         "Is the daemon still alive?"
372         try:
373             os.kill(self.pid, 0)
374             return True
375         except OSError:
376             return False
377     def add_device(self, path):
378         "Add a device to the daemon's internal search list."
379         if self.__get_control_socket():
380             self.sock.sendall("+%s\r\n\x00" % path)
381             self.sock.recv(12)
382             self.sock.close()
383     def remove_device(self, path):
384         "Remove a device from the daemon's internal search list."
385         if self.__get_control_socket():
386             self.sock.sendall("-%s\r\n\x00" % path)
387             self.sock.recv(12)
388             self.sock.close()
389     def kill(self):
390         "Kill the daemon instance."
391         if self.pid:
392             try:
393                 os.kill(self.pid, signal.SIGTERM)
394                 # Raises an OSError for ESRCH when we've killed it.
395                 while True:
396                     os.kill(self.pid, signal.SIGTERM)
397                     time.sleep(0.01)
398             except OSError:
399                 pass
400             self.pid = None
401
402 class TestSessionError(exceptions.Exception):
403     def __init__(self, msg):
404         self.msg = msg
405
406 class TestSession:
407     "Manage a session including a daemon with fake GPSes and clients."
408     def __init__(self, prefix=None, port=None, options=None, verbose=0, predump=False, udp=False):
409         "Initialize the test session by launching the daemon."
410         self.prefix = prefix
411         self.port = port
412         self.options = options
413         self.verbose = verbose
414         self.predump = predump
415         self.udp = udp
416         self.daemon = DaemonInstance()
417         self.fakegpslist = {}
418         self.client_id = 0
419         self.readers = 0
420         self.writers = 0
421         self.runqueue = []
422         self.index = 0
423         if port:
424             self.port = port
425         else:
426             self.port = gps.GPSD_PORT
427         self.progress = lambda x: None
428         self.reporter = lambda x: None
429         self.default_predicate = None
430         self.fd_set = []
431         self.threadlock = None
432     def spawn(self):
433         for sig in (signal.SIGQUIT, signal.SIGINT, signal.SIGTERM):
434             signal.signal(sig, lambda signal, frame: self.cleanup())
435         self.daemon.spawn(background=True, prefix=self.prefix, port=self.port, options=self.options)
436         self.daemon.wait_pid()
437     def set_predicate(self, pred):
438         "Set a default go predicate for the session."
439         self.default_predicate = pred
440     def gps_add(self, logfile, speed=19200, pred=None):
441         "Add a simulated GPS being fed by the specified logfile."
442         self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed))
443         if logfile not in self.fakegpslist:
444             testload = TestLoad(logfile, predump=self.predump)
445             if testload.sourcetype == "UDP" or self.udp:
446                 newgps = FakeUDP(testload, ipaddr="127.0.0.1", port="5000",
447                                    progress=self.progress)
448             else:
449                 newgps = FakePTY(testload, speed=speed, 
450                                    progress=self.progress)
451             if pred:
452                 newgps.go_predicate = pred
453             elif self.default_predicate:
454                 newgps.go_predicate = self.default_predicate
455             self.fakegpslist[newgps.byname] = newgps
456             self.append(newgps)
457             newgps.exhausted = 0
458         self.daemon.add_device(newgps.byname)
459         return newgps.byname
460     def gps_remove(self, name):
461         "Remove a simulated GPS from the daemon's search list."
462         self.progress("gpsfake: gps_remove(%s)\n" % name)
463         self.fakegpslist[name].drain()
464         self.remove(self.fakegpslist[name])
465         self.daemon.remove_device(name)
466         del self.fakegpslist[name]
467     def client_add(self, commands):
468         "Initiate a client session and force connection to a fake GPS."
469         self.progress("gpsfake: client_add()\n")
470         newclient = gps.gps(port=self.port, verbose=self.verbose)
471         self.append(newclient)
472         newclient.id = self.client_id + 1 
473         self.client_id += 1
474         self.progress("gpsfake: client %d has %s\n" % (self.client_id,newclient.device))
475         if commands:
476             self.initialize(newclient, commands) 
477         return self.client_id
478     def client_remove(self, cid):
479         "Terminate a client session."
480         self.progress("gpsfake: client_remove(%d)\n" % cid)
481         for obj in self.runqueue:
482             if isinstance(obj, gps.gps) and obj.id == cid:
483                 self.remove(obj)
484                 return True
485         else:
486             return False
487     def wait(self, seconds):
488         "Wait, doing nothing."
489         self.progress("gpsfake: wait(%d)\n" % seconds)
490         time.sleep(seconds)
491     def gather(self, seconds):
492         "Wait, doing nothing but watching for sentences."
493         self.progress("gpsfake: gather(%d)\n" % seconds)
494         #mark = time.time()
495         time.sleep(seconds)
496         #if self.timings.c_recv_time <= mark:
497         #    TestSessionError("no sentences received\n")
498     def cleanup(self):
499         "We're done, kill the daemon."
500         self.progress("gpsfake: cleanup()\n")
501         if self.daemon:
502             self.daemon.kill()
503             self.daemon = None
504     def run(self):
505         "Run the tests."
506         try:
507             self.progress("gpsfake: test loop begins\n")
508             while self.daemon:
509                 # We have to read anything that gpsd might have tried
510                 # to send to the GPS here -- under OpenBSD the
511                 # TIOCDRAIN will hang, otherwise.
512                 for device in self.runqueue:
513                     if isinstance(device, FakeGPS):
514                         device.read()
515                 had_output = False
516                 chosen = self.choose()
517                 if isinstance(chosen, FakeGPS):
518                     if chosen.exhausted and (time.time() - chosen.exhausted > CLOSE_DELAY):
519                         self.gps_remove(chosen.byname)
520                         self.progress("gpsfake: GPS %s removed\n" % chosen.byname)
521                     elif not chosen.go_predicate(chosen.index, chosen):
522                         if chosen.exhausted == 0:
523                             chosen.exhausted = time.time()
524                             self.progress("gpsfake: GPS %s ran out of input\n" % chosen.byname)
525                     else:
526                         chosen.feed()
527                 elif isinstance(chosen, gps.gps):
528                     if chosen.enqueued:
529                         chosen.send(chosen.enqueued)
530                         chosen.enqueued = ""
531                     while chosen.waiting():
532                         chosen.poll()
533                         if chosen.valid & gps.PACKET_SET:
534                             self.reporter(chosen.response)
535                         had_output = True
536                 else:
537                     raise TestSessionError("test object of unknown type")
538                 if not self.writers and not had_output:
539                     self.progress("gpsfake: no writers and no output\n")
540                     break
541             self.progress("gpsfake: test loop ends\n")
542         finally:
543             self.cleanup()
544
545     # All knowledge about locks and threading is below this line,
546     # except for the bare fact that self.threadlock is set to None
547     # in the class init method.
548
549     def append(self, obj):
550         "Add a producer or consumer to the object list."
551         if self.threadlock:
552             self.threadlock.acquire()
553         self.runqueue.append(obj)
554         if isinstance(obj, FakeGPS):
555             self.writers += 1
556         elif isinstance(obj, gps.gps):
557             self.readers += 1
558         if self.threadlock:
559             self.threadlock.release()
560     def remove(self, obj):
561         "Remove a producer or consumer from the object list."
562         if self.threadlock:
563             self.threadlock.acquire()
564         self.runqueue.remove(obj)
565         if isinstance(obj, FakeGPS):
566             self.writers -= 1
567         elif isinstance(obj, gps.gps):
568             self.readers -= 1
569         self.index = min(len(self.runqueue)-1, self.index)
570         if self.threadlock:
571             self.threadlock.release()
572     def choose(self):
573         "Atomically get the next object scheduled to do something."
574         if self.threadlock:
575             self.threadlock.acquire()
576         chosen = self.index
577         self.index += 1
578         self.index %= len(self.runqueue)
579         if self.threadlock:
580             self.threadlock.release()
581         return self.runqueue[chosen]
582     def initialize(self, client, commands):
583         "Arrange for client to ship specified commands when it goes active."
584         client.enqueued = ""
585         if not self.threadlock:
586             client.send(commands)
587         else:
588             client.enqueued = commands
589     def start(self):
590         self.threadlock = threading.Lock()
591         threading.Thread(target=self.run)
592
593 # End