#!/usr/bin/env python # -*- coding: utf8 -*- import pygtk pygtk.require('2.0') import gtk import cairo import gobject from math import pi from math import cos from math import sin from socket import error as SocketError __author__ = 'Robin Wittler ' __license__ = 'BSD' __version__ = '0.0.7' # BSD terms apply: see the file COPYING in the distribution root for details. #TODO # add getopts and handle it # add configparser # add a config menu entry # write unit tests! # testing! # cleanup and sanitize code class Speedometer(gtk.DrawingArea): def __init__(self, speed_unit=None): gtk.DrawingArea.__init__(self) self.connect('expose_event', self.expose_event) self.long_ticks = (2, 1, 0, -1, -2, -3, -4, -5, -6, -7, -8) self.short_ticks = (0.1, 0.2, 0.3, 0.4, 0.6, 0.7, 0.8, 0.9) self.long_inset = lambda x: 0.1 * x self.middle_inset = lambda x: self.long_inset(x) / 1.5 self.short_inset = lambda x: self.long_inset(x) / 3 self.res_div = 10.0 self.res_div_mul = 1 self.last_speed = 0 self.MPS_TO_KPH = 3.6000000000000001 self.MPS_TO_MPH = 2.2369363 self.MPS_TO_KNOTS = 1.9438445 self.MPH_UNIT_LABEL = 'mph' self.KPH_UNIT_LABEL = 'kmh' self.KNOTS_UNIT_LABEL = 'knots' self.conversions = { self.MPH_UNIT_LABEL: self.MPS_TO_MPH, self.KPH_UNIT_LABEL: self.MPS_TO_KPH, self.KNOTS_UNIT_LABEL: self.MPS_TO_KNOTS } self.speed_unit = speed_unit or self.MPH_UNIT_LABEL if not self.speed_unit in self.conversions: raise TypeError( '%s is not a valid speed unit' %(repr(speed_unit)) ) self.nums = { -8: 0, -7: 10, -6: 20, -5: 30, -4: 40, -3: 50, -2: 60, -1: 70, 0: 80, 1: 90, 2: 100 } def expose_event(self, widget, event, data=None): self.cr = self.window.cairo_create() self.cr.rectangle( event.area.x, event.area.y, event.area.width, event.area.height ) self.cr.clip() x, y = self.get_x_y() width, height = self.window.get_size() radius = self.get_radius(width, height) self.cr.set_line_width(radius / 100) self.draw_arc_and_ticks(width, height, radius, x, y) self.draw_needle(self.last_speed, radius, x, y) self.draw_speed_text(self.last_speed, radius, x, y) def draw_arc_and_ticks(self, width, height, radius, x, y): self.cr.set_source_rgb(1.0, 1.0, 1.0) self.cr.rectangle(0, 0, width, height) self.cr.fill() self.cr.set_source_rgb(0.0, 0.0, 0.0) #draw the speedometer arc self.cr.arc_negative( x, y, radius, self.degrees_to_radians(60), self.degrees_to_radians(120) ) self.cr.stroke() long_inset = self.long_inset(radius) middle_inset = self.middle_inset(radius) short_inset = self.short_inset(radius) #draw the ticks for i in self.long_ticks: self.cr.move_to( x + (radius - long_inset) * cos(i * pi / 6.0), y + (radius - long_inset) * sin(i * pi / 6.0) ) self.cr.line_to( x + (radius + (self.cr.get_line_width() / 2)) * cos(i * pi / 6.0), y + (radius + (self.cr.get_line_width() / 2)) * sin(i * pi / 6.0) ) self.cr.select_font_face( 'Georgia', cairo.FONT_SLANT_NORMAL, ) self.cr.set_font_size(radius / 10) self.cr.save() _num = str(self.nums.get(i) * self.res_div_mul) ( x_bearing, y_bearing, t_width, t_height, x_advance, y_advance ) = self.cr.text_extents(_num) if i in (-8, -7, -6, -5, -4): self.cr.move_to( (x + (radius - long_inset - (t_width / 2)) * cos(i * pi / 6.0)), (y + (radius - long_inset - (t_height * 2)) * sin(i * pi / 6.0)) ) elif i in (-2, -1, 0, 2, 1): self.cr.move_to( (x + (radius - long_inset - (t_width * 1.5 )) * cos(i * pi / 6.0)), (y + (radius - long_inset - (t_height * 2 )) * sin(i * pi / 6.0)) ) elif i in (-3,): self.cr.move_to( (x - t_width / 2), (y - radius + self.long_inset(radius) * 2 + t_height) ) self.cr.show_text(_num) self.cr.restore() if i != self.long_ticks[0]: self.cr.move_to( x + (radius - middle_inset) * cos((i + 0.5) * pi / 6.0), y + (radius - middle_inset) * sin((i + 0.5) * pi / 6.0) ) self.cr.line_to( x + (radius + (self.cr.get_line_width() / 2)) * cos((i + 0.5) * pi / 6.0), y + (radius + (self.cr.get_line_width() / 2)) * sin((i + 0.5) * pi / 6.0) ) for z in self.short_ticks: if i < 0: self.cr.move_to( x + (radius - short_inset) * cos((i + z) * pi / 6.0), y + (radius - short_inset) * sin((i + z) * pi / 6.0) ) self.cr.line_to( x + (radius + (self.cr.get_line_width() / 2)) * cos((i + z) * pi / 6.0), y + (radius + (self.cr.get_line_width() / 2)) * sin((i + z) * pi / 6.0) ) else: self.cr.move_to( x + (radius - short_inset) * cos((i - z) * pi / 6.0), y + (radius - short_inset) * sin((i - z) * pi / 6.0) ) self.cr.line_to( x + (radius + (self.cr.get_line_width() / 2)) * cos((i - z) * pi / 6.0), y + (radius + (self.cr.get_line_width() / 2)) * sin((i - z) * pi / 6.0) ) self.cr.stroke() def draw_needle(self, speed, radius, x, y): self.cr.save() inset = self.long_inset(radius) speed = speed * self.conversions.get(self.speed_unit) speed = speed / (self.res_div * self.res_div_mul) actual = self.long_ticks[-1] + speed if actual > self.long_ticks[0]: #TODO test this in real conditions! ;) self.res_div_mul += 1 speed = speed / (self.res_div * self.res_div_mul) actual = self.long_ticks[-1] + speed self.cr.move_to(x, y) self.cr.line_to( x + (radius - (2 * inset)) * cos(actual * pi / 6.0), y + (radius - (2 * inset)) * sin(actual * pi / 6.0) ) self.cr.stroke() self.cr.restore() def draw_speed_text(self, speed, radius, x, y): self.cr.save() speed = '%.2f %s' %( speed * self.conversions.get(self.speed_unit), self.speed_unit ) self.cr.select_font_face( 'Georgia', cairo.FONT_SLANT_NORMAL, #cairo.FONT_WEIGHT_BOLD ) self.cr.set_font_size(radius / 10) x_bearing, y_bearing, t_width, t_height = self.cr.text_extents(speed)[:4] self.cr.move_to((x - t_width / 2), (y + radius) - self.long_inset(radius)) self.cr.show_text(speed) self.cr.restore() def degrees_to_radians(self, degrees): return ((pi / 180) * degrees) def radians_to_degrees(self, radians): return ((pi * 180) / radians) def get_x_y(self): rect = self.get_allocation() x = (rect.x + rect.width / 2.0) y = (rect.y + rect.height / 2.0) - 20 return x, y def get_radius(self, width, height): return min(width / 2.0, height / 2.0) - 20 class Main(object): def __init__(self, host='localhost', port='2947', device=None, debug=0, speed_unit=None): self.host = host self.port = port self.device = device self.debug = debug self.speed_unit = speed_unit self.window = gtk.Window(gtk.WINDOW_TOPLEVEL) self.window.set_title('xgpsspeed') self.widget = Speedometer(speed_unit=self.speed_unit) self.window.connect('delete_event', self.delete_event) self.window.connect('destroy', self.destroy) self.widget.show() vbox = gtk.VBox(False, 0) self.window.add(vbox) self.window.present() self.uimanager = gtk.UIManager() self.accelgroup = self.uimanager.get_accel_group() self.window.add_accel_group(self.accelgroup) self.actiongroup = gtk.ActionGroup('gpsspeed-ng') self.actiongroup.add_actions( [('Quit', gtk.STOCK_QUIT, '_Quit', None, 'Quit the Program', lambda x: gtk.main_quit()), ('File', None, '_File'), ('Units', None, '_Units')] ) self.actiongroup.add_radio_actions( [('Imperial', None, '_Imperial', 'i', 'Imperial Units', 0), ('Metric', None, '_Metric', 'm', 'Metrical Units', 1), ('Nautical', None, '_Nautical', 'n', 'Nautical Units', 2) ], 0, lambda a, v: setattr(self.widget, 'speed_unit', ['mph', 'kmh', 'knots'][a.get_current_value()]) ) self.uimanager.insert_action_group(self.actiongroup, 0) self.uimanager.add_ui_from_string(''' ''') self.active_unit_map = { 'mph': '/MenuBar/Units/Imperial', 'kmh': '/MenuBar/Units/Metric', 'knots': '/MenuBar/Units/Nautical' } menubar = self.uimanager.get_widget('/MenuBar') self.uimanager.get_widget( self.active_unit_map.get(self.speed_unit) ).set_active(True) vbox.pack_start(menubar, False, False, 0) vbox.add(self.widget) self.window.show_all() def watch(self, daemon, device): self.daemon = daemon self.device = device gobject.io_add_watch(daemon.sock, gobject.IO_IN, self.handle_response) gobject.io_add_watch(daemon.sock, gobject.IO_ERR, self.handle_hangup) gobject.io_add_watch(daemon.sock, gobject.IO_HUP, self.handle_hangup) return True def handle_response(self, source, condition): if self.daemon.poll() == -1: self.handle_hangup(source, condition) if self.daemon.data['class'] == 'TPV': self.update_speed(self.daemon.data) return True def handle_hangup(self, source, condition): w = gtk.MessageDialog( type=gtk.MESSAGE_ERROR, flags=gtk.DIALOG_DESTROY_WITH_PARENT, buttons=gtk.BUTTONS_OK ) w.connect("destroy", lambda w: gtk.main_quit()) w.set_title('gpsd error') w.set_markup("gpsd has stopped sending data.") w.run() gtk.main_quit() return True def update_speed(self, data): if hasattr(data, 'speed'): self.widget.last_speed = data.speed self.widget.queue_draw() def delete_event(self, widget, event, data=None): #TODO handle all cleanup operations here return False def destroy(self, widget, data=None): gtk.main_quit() def run(self): import gps try: daemon = gps.gps( host = self.host, port = self.port, mode = gps.WATCH_ENABLE|gps.WATCH_JSON|gps.WATCH_SCALED, verbose = self.debug ) self.watch(daemon, self.device) gtk.main() except SocketError: w = gtk.MessageDialog( type=gtk.MESSAGE_ERROR, flags=gtk.DIALOG_DESTROY_WITH_PARENT, buttons=gtk.BUTTONS_OK ) w.set_title('socket error') w.set_markup( "could not connect to gpsd socket. make sure gpsd is running." ) w.run() w.destroy() except KeyboardInterrupt: self.window.emit('delete_event', gtk.gdk.Event(gtk.gdk.NOTHING)) if __name__ == '__main__': from sys import argv, exit from os.path import basename from optparse import OptionParser prog = basename(argv[0]) usage = ('%s [-V|--version] [-h|--help] [--debug] [--host] ' + '[--port] [--device] [--speedunits {[mph] [kmh] [knots]}] ' + '[host [:port [:device]]]') %(prog) epilog = 'BSD terms apply: see the file COPYING in the distribution root for details.' version = '%s %s' %(prog, __version__) parser = OptionParser(usage=usage, epilog=epilog) parser.add_option( '--host', dest='host', default='localhost', help='The host to connect. [Default localhost]' ) parser.add_option( '--port', dest='port', default='2947', help='The port to connect. [Default 2947]' ) parser.add_option( '--device', dest='device', default=None, help='The device to connet. [Default None]' ) parser.add_option( '--speedunits', dest='speedunits', default='mph', help='The unit of speed. Possible units are: mph, kmh, knots. [Default mph]' ) parser.add_option( '--debug', dest='debug', default=0, action='store', type='int', help='Set level of debug. Must be integer. [Default 0]' ) parser.add_option( '-V', '--version', action='store_true', default=False, dest='version', help='show program\'s version number and exit' ) (options, args) = parser.parse_args() if options.version: print version exit(0) if args: arg = args[0].split(':') len_arg = len(arg) if len_arg == 1: (options.host,) = arg elif len_arg == 2: (options.host, options.port) = arg elif len_arg == 3: (options.host, options.port, options.device) = arg else: parser.print_help() exit(0) Main( host=options.host, port=options.port, device=options.device, speed_unit=options.speedunits, debug=options.debug ).run()