1 # This file is Copyright (c) 2010 by the GPSD project
2 # BSD terms apply: see the file COPYING in the distribution root for details.
4 gpsfake.py -- classes for creating a controlled test environment around gpsd.
6 The gpsfake(1) regression tester shipped with gpsd is a trivial wrapper
7 around this code. For a more interesting usage example, see the
8 valgrind-audit script shipped with the gpsd code.
10 To use this code, start by instantiating a TestSession class. Use the
11 prefix argument if you want to run the daemon under some kind of run-time
12 monitor like valgrind or gdb. Here are some particularly useful possibilities:
14 valgrind --tool=memcheck --gen-suppressions=yes --leak-check=yes
15 Run under Valgrind, checking for malloc errors and memory leaks.
17 xterm -e gdb -tui --args
18 Run under gdb, controlled from a new xterm.
20 You can use the options argument to pass in daemon options; normally you will
21 use this to set the debug-logging level.
23 On initialization, the test object spawns an instance of gpsd with no
24 devices or clients attached, connected to a control socket.
26 TestSession has methods to attach and detch fake GPSes. The
27 TestSession class simulates GPS devices for you with objects composed
28 from a pty and a class instance that cycles sentences into the master side
29 from some specified logfile; gpsd reads the slave side. A fake GPS is
30 identified by the string naming its slave device.
32 TestSession also has methods to start and end client sessions. Daemon
33 responses to a client are fed to a hook function which, by default,
34 discards them. You can change the hook to sys.stdout.write() to dump
35 responses to standard output (this is what the gpsfake executable
36 does) or do something more exotic. A client session is identified by a
37 small integer that counts the number of client session starts.
39 There are a couple of convenience methods. TestSession.wait() does nothing,
40 allowing a specified number of seconds to elapse. TestSession.send()
41 ships commands to an open client session.
43 TestSession does not currently capture the daemon's log output. It is
44 run with -N, so the output will go to stderr (along with, for example,
45 Valgrind notifications).
47 Each FakeGPS instance tries to packetize the data from the logfile it
48 is initialized with. It uses the same packet-getter as the daeomon.
50 The TestSession code maintains a run queue of FakeGPS and gps.gs (client-
51 session) objects. It repeatedly cycles through the run queue. For each
52 client session object in the queue, it tries to read data from gpsd. For
53 each fake GPS, it sends one line of stored data. When a fake-GPS's
54 go predicate becomes false, the fake GPS is removed from the run queue.
56 There are two ways to use this code. The more deterministic is
57 non-threaded mode: set up your client sessions and fake GPS devices,
58 then call the run() method. The run() method will terminate when
59 there are no more objects in the run queue. Note, you must have
60 created at least one fake client or fake GPS before calling run(),
61 otherwise it will terminate immediately.
63 To allow for adding and removing clients while the test is running,
64 run in threaded mode by calling the start() method. This simply calls
65 the run method in a subthread, with locking of critical regions.
67 import sys, os, time, signal, pty, termios # fcntl, array, struct
68 import exceptions, threading, socket
70 import packet as sniffer
72 # The two magic numbers below have to be derived from observation. If
73 # they're too high you'll slow the tests down a lot. If they're too low
74 # you'll get random spurious regression failures that usually look
75 # like lines missing from the end of the test output relative to the
76 # check file. These numbers might have to be adjusted upward on faster
77 # machines. The need for them may be symnptomatic of race conditions
78 # in the pty layer or elsewhere.
80 # Define a per-line delay on writes so we won't spam the buffers in
81 # the pty layer or gpsd itself. Removing this entirely was tried but
82 # caused failures under NetBSD. Values smaller than the system timer
83 # tick don't make any difference here.
86 # We delay briefly after a GPS source is exhausted before removing it.
87 # This should give its subscribers time to get gpsd's response before
88 # we call the cleanup code. Note that using fractional seconds in
89 # CLOSE_DELAY may have no effect; Python time.time() returns a float
90 # value, but it is not guaranteed by Python that the C implementation
91 # underneath will return with precision finer than 1 second. (Linux
92 # and *BSD return full precision.)
95 class TestLoadError(exceptions.Exception):
96 def __init__(self, msg):
100 "Digest a logfile into a list of sentences we can cycle through."
101 def __init__(self, logfp, predump=False):
102 self.sentences = [] # This is the interesting part
103 if type(logfp) == type(""):
104 logfp = open(logfp, "r");
105 self.name = logfp.name
107 self.predump = predump
108 self.logfile = logfp.name
110 self.sourcetype = "pty"
113 getter = sniffer.new()
114 #gps.packet.register_report(reporter)
117 (len, ptype, packet) = getter.get(logfp.fileno())
120 elif ptype == sniffer.COMMENT_PACKET:
121 # Some comments are magic
122 if "Serial:" in packet:
123 # Change serial parameters
124 packet = packet[1:].strip()
126 (xx, baud, params) = packet.split()
128 if params[0] in ('7', '8'):
129 databits = int(params[0])
132 if params[1] in ('N', 'O', 'E'):
136 if params[2] in ('1', '2'):
137 stopbits = int(params[2])
140 except (ValueError, IndexError):
141 raise TestLoadError("bad serial-parameter spec in %s"%\
143 self.serial = (baud, databits, parity, stopbits)
144 elif "UDP" in packet:
145 self.sourcetype = "UDP"
147 # Pass through for later interpretation
148 self.sentences.append(packet)
150 if type_latch is None:
155 raise TestLoadError("zero-length packet from %s"%\
157 self.sentences.append(packet)
158 # Look at the first packet to grok the GPS type
159 self.textual = (type_latch == sniffer.NMEA_PACKET)
161 self.legend = "gpsfake: line %d: "
163 self.legend = "gpsfake: packet %d"
165 class PacketError(exceptions.Exception):
166 def __init__(self, msg):
170 def __init__(self, testload, progress=None):
171 self.testload = testload
172 self.progress = progress
173 self.go_predicate = lambda: True
176 self.progress("gpsfake: %s provides %d sentences\n" % (self.testload.name, len(self.testload.sentences)))
179 "Feed a line from the contents of the GPS log to the daemon."
180 line = self.testload.sentences[self.index % len(self.testload.sentences)]
181 if "%Delay:" in line:
182 # Delay specified number of seconds
183 delay = line.split()[1]
184 time.sleep(int(delay))
185 # self.write has to be set by the derived class
188 self.progress("gpsfake: %s feeds %d=%s\n" % (self.testload.name, len(line), `line`))
189 time.sleep(WRITE_PAD)
192 class FakePTY(FakeGPS):
193 "A FakePTY is a pty with a test log ready to be cycled to it."
194 def __init__(self, testload,
195 speed=4800, databits=8, parity='N', stopbits=1,
197 FakeGPS.__init__(self, testload, progress)
198 # Allow Serial: header to be overridden by explicit spped.
199 if self.testload.serial:
200 (speed, databits, parity, stopbits) = self.testload.serial
217 19200: termios.B19200,
218 38400: termios.B38400,
219 57600: termios.B57600,
220 115200: termios.B115200,
221 230400: termios.B230400,
223 speed = baudrates[speed] # Throw an error if the speed isn't legal
224 (self.fd, self.slave_fd) = pty.openpty()
225 self.byname = os.ttyname(self.slave_fd)
226 (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(self.slave_fd)
228 cflag &= ~(termios.PARENB | termios.PARODD | termios.CRTSCTS)
229 cflag |= termios.CREAD | termios.CLOCAL
230 iflag = oflag = lflag = 0
231 iflag &=~ (termios.PARMRK | termios.INPCK)
232 cflag &=~ (termios.CSIZE | termios.CSTOPB | termios.PARENB | termios.PARODD)
238 cflag |= termios.CSTOPB
240 iflag |= termios.INPCK
241 cflag |= termios.PARENB
243 iflag |= termios.INPCK
244 cflag |= termios.PARENB | termios.PARODD
245 ispeed = ospeed = speed
246 termios.tcsetattr(self.slave_fd, termios.TCSANOW,
247 [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
249 "Discard control strings written by gpsd."
250 # A tcflush implementation works on Linux but fails on OpenBSD 4.
251 termios.tcflush(self.fd, termios.TCIFLUSH)
252 # Alas, the FIONREAD version also works on Linux and fails on OpenBSD.
254 # buf = array.array('i', [0])
255 # fcntl.ioctl(self.master_fd, termios.FIONREAD, buf, True)
256 # n = struct.unpack('i', buf)[0]
257 # os.read(self.master_fd, n)
261 def write(self, line):
262 os.write(self.fd, line)
265 "Wait for the associated device to drain (e.g. before closing)."
266 termios.tcdrain(self.fd)
268 class FakeUDP(FakeGPS):
269 "A UDP broadcaster with a test log ready to be cycled to it."
270 def __init__(self, testload,
273 FakeGPS.__init__(self, testload, progress)
276 self.byname = "udp://" + ipaddr + ":" + port
277 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
280 "Discard control strings written by gpsd."
283 def write(self, line):
284 self.sock.sendto(line, (self.ipaddr, int(self.port)))
287 "Wait for the associated device to drain (e.g. before closing)."
288 pass # shutdown() fails on UDP
290 class DaemonError(exceptions.Exception):
291 def __init__(self, msg):
294 return repr(self.msg)
296 class DaemonInstance:
297 "Control a gpsd instance."
298 def __init__(self, control_socket=None):
302 self.control_socket = control_socket
304 self.control_socket = "/tmp/gpsfake-%d.sock" % os.getpid()
305 self.pidfile = "/tmp/gpsfake_pid-%s" % os.getpid()
306 def spawn(self, options, port, background=False, prefix=""):
307 "Spawn a daemon instance."
310 # Look for gpsd in GPSD_HOME env variable
311 if os.environ.get('GPSD_HOME'):
312 for path in os.environ['GPSD_HOME'].split(':'):
313 _spawncmd = "%s/gpsd" % path
314 if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK):
315 self.spawncmd = _spawncmd
318 # if we could not find it yet try PATH env variable for it
319 if not self.spawncmd:
320 if not '/usr/sbin' in os.environ['PATH']:
321 os.environ['PATH']=os.environ['PATH'] + ":/usr/sbin"
322 for path in os.environ['PATH'].split(':'):
323 _spawncmd = "%s/gpsd" % path
324 if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK):
325 self.spawncmd = _spawncmd
328 if not self.spawncmd:
329 raise DaemonError("Cannot execute gpsd: executable not found. Set GPSD_HOME env variable")
330 # The -b option to suppress hanging on probe returns is needed to cope
331 # with OpenBSD (and possibly other non-Linux systems) that don't support
332 # anything we can use to implement the FakeGPS.read() method
333 self.spawncmd += " -b -N -S %s -F %s -P %s %s" % (port, self.control_socket, self.pidfile, options)
335 self.spawncmd = prefix + " " + self.spawncmd.strip()
337 self.spawncmd += " &"
338 status = os.system(self.spawncmd)
339 if os.WIFSIGNALED(status) or os.WEXITSTATUS(status):
340 raise DaemonError("daemon exited with status %d" % status)
342 "Wait for the daemon, get its PID and a control-socket connection."
345 fp = open(self.pidfile)
352 self.pid = int(pidstr)
355 continue # Avoid race condition -- PID not yet written
358 def __get_control_socket(self):
359 # Now we know it's running, get a connection to the control socket.
360 if not os.path.exists(self.control_socket):
363 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
364 self.sock.connect(self.control_socket)
371 "Is the daemon still alive?"
377 def add_device(self, path):
378 "Add a device to the daemon's internal search list."
379 if self.__get_control_socket():
380 self.sock.sendall("+%s\r\n\x00" % path)
383 def remove_device(self, path):
384 "Remove a device from the daemon's internal search list."
385 if self.__get_control_socket():
386 self.sock.sendall("-%s\r\n\x00" % path)
390 "Kill the daemon instance."
393 os.kill(self.pid, signal.SIGTERM)
394 # Raises an OSError for ESRCH when we've killed it.
396 os.kill(self.pid, signal.SIGTERM)
402 class TestSessionError(exceptions.Exception):
403 def __init__(self, msg):
407 "Manage a session including a daemon with fake GPSes and clients."
408 def __init__(self, prefix=None, port=None, options=None, verbose=0, predump=False, udp=False):
409 "Initialize the test session by launching the daemon."
412 self.options = options
413 self.verbose = verbose
414 self.predump = predump
416 self.daemon = DaemonInstance()
417 self.fakegpslist = {}
426 self.port = gps.GPSD_PORT
427 self.progress = lambda x: None
428 self.reporter = lambda x: None
429 self.default_predicate = None
431 self.threadlock = None
433 for sig in (signal.SIGQUIT, signal.SIGINT, signal.SIGTERM):
434 signal.signal(sig, lambda signal, frame: self.cleanup())
435 self.daemon.spawn(background=True, prefix=self.prefix, port=self.port, options=self.options)
436 self.daemon.wait_pid()
437 def set_predicate(self, pred):
438 "Set a default go predicate for the session."
439 self.default_predicate = pred
440 def gps_add(self, logfile, speed=19200, pred=None):
441 "Add a simulated GPS being fed by the specified logfile."
442 self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed))
443 if logfile not in self.fakegpslist:
444 testload = TestLoad(logfile, predump=self.predump)
445 if testload.sourcetype == "UDP" or self.udp:
446 newgps = FakeUDP(testload, ipaddr="127.0.0.1", port="5000",
447 progress=self.progress)
449 newgps = FakePTY(testload, speed=speed,
450 progress=self.progress)
452 newgps.go_predicate = pred
453 elif self.default_predicate:
454 newgps.go_predicate = self.default_predicate
455 self.fakegpslist[newgps.byname] = newgps
458 self.daemon.add_device(newgps.byname)
460 def gps_remove(self, name):
461 "Remove a simulated GPS from the daemon's search list."
462 self.progress("gpsfake: gps_remove(%s)\n" % name)
463 self.fakegpslist[name].drain()
464 self.remove(self.fakegpslist[name])
465 self.daemon.remove_device(name)
466 del self.fakegpslist[name]
467 def client_add(self, commands):
468 "Initiate a client session and force connection to a fake GPS."
469 self.progress("gpsfake: client_add()\n")
470 newclient = gps.gps(port=self.port, verbose=self.verbose)
471 self.append(newclient)
472 newclient.id = self.client_id + 1
474 self.progress("gpsfake: client %d has %s\n" % (self.client_id,newclient.device))
476 self.initialize(newclient, commands)
477 return self.client_id
478 def client_remove(self, cid):
479 "Terminate a client session."
480 self.progress("gpsfake: client_remove(%d)\n" % cid)
481 for obj in self.runqueue:
482 if isinstance(obj, gps.gps) and obj.id == cid:
487 def wait(self, seconds):
488 "Wait, doing nothing."
489 self.progress("gpsfake: wait(%d)\n" % seconds)
491 def gather(self, seconds):
492 "Wait, doing nothing but watching for sentences."
493 self.progress("gpsfake: gather(%d)\n" % seconds)
496 #if self.timings.c_recv_time <= mark:
497 # TestSessionError("no sentences received\n")
499 "We're done, kill the daemon."
500 self.progress("gpsfake: cleanup()\n")
507 self.progress("gpsfake: test loop begins\n")
509 # We have to read anything that gpsd might have tried
510 # to send to the GPS here -- under OpenBSD the
511 # TIOCDRAIN will hang, otherwise.
512 for device in self.runqueue:
513 if isinstance(device, FakeGPS):
516 chosen = self.choose()
517 if isinstance(chosen, FakeGPS):
518 if chosen.exhausted and (time.time() - chosen.exhausted > CLOSE_DELAY):
519 self.gps_remove(chosen.byname)
520 self.progress("gpsfake: GPS %s removed\n" % chosen.byname)
521 elif not chosen.go_predicate(chosen.index, chosen):
522 if chosen.exhausted == 0:
523 chosen.exhausted = time.time()
524 self.progress("gpsfake: GPS %s ran out of input\n" % chosen.byname)
527 elif isinstance(chosen, gps.gps):
529 chosen.send(chosen.enqueued)
531 while chosen.waiting():
533 if chosen.valid & gps.PACKET_SET:
534 self.reporter(chosen.response)
537 raise TestSessionError("test object of unknown type")
538 if not self.writers and not had_output:
539 self.progress("gpsfake: no writers and no output\n")
541 self.progress("gpsfake: test loop ends\n")
545 # All knowledge about locks and threading is below this line,
546 # except for the bare fact that self.threadlock is set to None
547 # in the class init method.
549 def append(self, obj):
550 "Add a producer or consumer to the object list."
552 self.threadlock.acquire()
553 self.runqueue.append(obj)
554 if isinstance(obj, FakeGPS):
556 elif isinstance(obj, gps.gps):
559 self.threadlock.release()
560 def remove(self, obj):
561 "Remove a producer or consumer from the object list."
563 self.threadlock.acquire()
564 self.runqueue.remove(obj)
565 if isinstance(obj, FakeGPS):
567 elif isinstance(obj, gps.gps):
569 self.index = min(len(self.runqueue)-1, self.index)
571 self.threadlock.release()
573 "Atomically get the next object scheduled to do something."
575 self.threadlock.acquire()
578 self.index %= len(self.runqueue)
580 self.threadlock.release()
581 return self.runqueue[chosen]
582 def initialize(self, client, commands):
583 "Arrange for client to ship specified commands when it goes active."
585 if not self.threadlock:
586 client.send(commands)
588 client.enqueued = commands
590 self.threadlock = threading.Lock()
591 threading.Thread(target=self.run)