#!/usr/bin/python import sys import gobject import string import dbus import dbus.service import dbus.mainloop.glib import glib import traceback def extract_list(list): val = "[" for i in list: val += " " + str(i) val += " ]" return val def extract_values(values): val = "{" for key in values.keys(): val += " " + key + "=" if key in ["PrefixLength"]: val += "%s" % (int(values[key])) else: if key in ["Servers", "Excludes"]: val += extract_list(values[key]) else: val += str(values[key]) val += " }" return val class Notification(dbus.service.Object): def __init__(self, bus, app, notify_path): dbus.service.Object.__init__(self) self.app = app @dbus.service.method("net.connman.Notification", in_signature='', out_signature='') def Release(self): print("Release") self.app.release() @dbus.service.method("net.connman.Notification", in_signature='a{sv}', out_signature='') def Update(self, settings): print "Update called" try: for key in settings.keys(): if key in ["IPv4", "IPv6"]: val = extract_values(settings[key]) elif key in ["AllowedBearers"]: val = extract_list(settings[key]) else: val = settings[key] print " %s = %s" % (key, val) except: print "Exception:" traceback.print_exc() class SessionApplication(dbus.service.Object): def __init__(self, bus, object_path, mainloop): dbus.service.Object.__init__(self, bus, object_path) self.manager = None self.notify_path = object_path + "/notify" self.notify = None self.session_path = None self.mainloop = mainloop self.session = None self.settings = { } try: bus = dbus.SystemBus() bus.watch_name_owner('net.connman', self.connman_name_owner_changed) except dbus.DBusException: traceback.print_exc() exit(1) def connman_name_owner_changed(self, proxy): try: if proxy: print "connman appeared on D-Bus ", str(proxy) bus = dbus.SystemBus() self.manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") else: print "connman disappeared on D-Bus" self.manager = None self.session = None if self.notify: self.notify.remove_from_connection() self.notify = None except dbus.DBusException: traceback.print_exc() exit(1) def release(self): if self.session: self.manager.DestroySession(self.session_path) self.session = None if self.notify: self.notify.remove_from_connection() self.notify = None def type_convert(self, key, value): if key in [ "AllowedBearers", "RoamingPolicy" ]: return value elif key in [ "Priority", "AvoidHandover", "StayConnected", "EmergencyCall" ]: flag = str(value[0]).strip().lower() val = flag not in ['false', 'f', 'n', '0'] return dbus.Boolean(val) elif key in [ "PeriodicConnect", "IdleTimeout" ]: val = value[0] return dbus.UInt32(val) return value @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def CreateSession(self): print "Create session" if self.session: print "already running a session -> drop reqest" return try: bus = dbus.SystemBus() self.notify = Notification(bus, self, self.notify_path) self.notify.add_to_connection(bus, self.notify_path) self.session_path = self.manager.CreateSession(self.settings, self.notify_path) print "notify path %s" % (self.notify_path) print "session path %s" % (self.session_path) self.session = dbus.Interface(bus.get_object("net.connman", self.session_path), "net.connman.Session") except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() exit(1) @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def DestroySession(self): print "Destroy session" if self.session == None: print "no session running -> drop request" return try: self.release() except dbus.DBusException: traceback.print_exc() exit(1) @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Connect(self): print "Connect session" if self.session == None: print "no session running -> drop request" return try: self.session.Connect() except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() exit(1) @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Disconnect(self): print "Disconnect session" if self.session == None: print "no session running -> drop request" return try: self.session.Disconnect() except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() exit(1) @dbus.service.method("com.example.TestSession", in_signature='sv', out_signature='') def Change(self, key, value): print "Update session settings" if self.session == None: print "no session running -> drop request" return try: val = self.type_convert(key, value) self.session.Change(key, val) except dbus.DBusException, e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print e.get_dbus_message() return traceback.print_exc() exit(1) @dbus.service.method("com.example.TestSession", in_signature='sv', out_signature='') def Configure(self, key, value): print "Configure session settings" val = self.type_convert(key, value) self.settings[key] = val def main(): if len(sys.argv) < 2: print "Usage: %s " % (sys.argv[0]) print "" print " enable" print " disable" print " create " print " destroy " print " connect " print " disconnect " print " change " print " configure " print "" print " run " sys.exit(1) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) if sys.argv[1] == "enable": bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") manager.SetProperty("SessionMode", True) return elif sys.argv[1] == "disable": bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") manager.SetProperty("SessionMode", False) return if (len(sys.argv) < 3): print "Need test application path" sys.exit(1) app_path = sys.argv[2] bus = dbus.SessionBus() app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/")) if sys.argv[1] == "run": name = dbus.service.BusName(app_name, bus) mainloop = gobject.MainLoop() app = SessionApplication(bus, app_path, mainloop) mainloop.run() return app = dbus.Interface(bus.get_object(app_name, app_path), "com.example.TestSession") if sys.argv[1] == "create": app.CreateSession() elif sys.argv[1] == "destroy": app.DestroySession() elif sys.argv[1] == "connect": app.Connect() elif sys.argv[1] == "disconnect": app.Disconnect() elif sys.argv[1] == "change": if len(sys.argv) < 5: print "Arguments missing" sys.exit(1) app.Change(sys.argv[3], sys.argv[4:]) elif sys.argv[1] == "configure": if len(sys.argv) < 5: print "Arguments missing" sys.exit(1) app.Configure(sys.argv[3], sys.argv[4:]) else: print "Unknown command '%s'" % sys.argv[1] sys.exit(1) if __name__ == '__main__': main()