#!/usr/bin/python import sys import gobject import string import dbus import dbus.service import dbus.mainloop.glib import glib import traceback def extract_list(list): val = "[" for i in list: val += " " + str(i) val += " ]" return val def extract_values(values): val = "{" for key in list(values.keys()): val += " " + key + "=" if key in ["PrefixLength"]: val += "%s" % (int(values[key])) else: if key in ["Servers", "Excludes"]: val += extract_list(values[key]) else: val += str(values[key]) val += " }" return val class Notification(dbus.service.Object): def __init__(self, bus, app, notify_path): dbus.service.Object.__init__(self) self.app = app @dbus.service.method("net.connman.Notification", in_signature='', out_signature='') def Release(self): print("Release %s" % (self._object_path)) session_name = self._object_path.split('/')[-1] self.app.release(session_name) @dbus.service.method("net.connman.Notification", in_signature='a{sv}', out_signature='') def Update(self, settings): print("Update called at %s" % (self._object_path)) try: for key in list(settings.keys()): if key in ["IPv4", "IPv6"]: val = extract_values(settings[key]) elif key in ["AllowedBearers"]: val = extract_list(settings[key]) else: val = settings[key] print(" %s = %s" % (key, val)) except: print("Exception:") traceback.print_exc() class SessionApplication(dbus.service.Object): def __init__(self, bus, object_path, mainloop): dbus.service.Object.__init__(self, bus, object_path) self.manager = None self.mainloop = mainloop self.sessions = {} try: bus = dbus.SystemBus() bus.watch_name_owner('net.connman', self.connman_name_owner_changed) except dbus.DBusException: traceback.print_exc() def connman_name_owner_changed(self, proxy): try: if proxy: print("connman appeared on D-Bus ", str(proxy)) bus = dbus.SystemBus() self.manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") else: print("connman disappeared on D-Bus") self.manager = None for s in list(self.sessions.keys()): self.sessions[s]['notify'].remove_from_connection() self.sessions[s]['notify'] = None self.sessions = {} except dbus.DBusException: traceback.print_exc() def release(self, session_name): s = self.find_session(session_name) if not s: return if s['session']: s['session'].Destroy() s['session'] = None if s['notify']: s['notify'].remove_from_connection() s['notify'] = None del self.sessions[session_name] def type_convert(self, key, value): if key in [ "AllowedBearers" ]: return value elif key in [ "RoamingPolicy", "ConnectionType" ]: if len(value) > 0: return value[0] elif key in [ "Priority", "AvoidHandover", "StayConnected", "EmergencyCall" ]: flag = str(value[0]).strip().lower() val = flag not in ['false', 'f', 'n', '0'] return dbus.Boolean(val) elif key in [ "PeriodicConnect", "IdleTimeout" ]: val = value[0] return dbus.UInt32(val) return value def find_session(self, session_name): if not session_name in list(self.sessions.keys()): return None return self.sessions[session_name] @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def CreateSession(self, session_name): print("Create session") s = self.find_session(session_name) if s and s['session'] : print("Session %s already created-> drop request" % (session_name)) return try: bus = dbus.SystemBus() if s == None: s = {} s['notify_path'] = self._object_path + "/" + session_name s['notify'] = Notification(bus, self, s['notify_path']) s['notify'].add_to_connection(bus, s['notify_path']) if not 'settings' in list(s.keys()): s['settings'] = {}; s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path']) print("notify path %s" % (s['notify_path'])) print("session path %s" % (s['session_path'])) s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']), "net.connman.Session") self.sessions[session_name] = s except dbus.DBusException as e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print(e.get_dbus_message()) return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def DestroySession(self, session_name): print("Destroy session") s = self.find_session(session_name) if s == None or s['session'] == None: print("The session is not running -> drop request") return try: self.release(session_name) except dbus.DBusException: traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Connect(self, session_name): print("Connect session") s = self.find_session(session_name) if s == None or s['session'] == None: print("The session is not running -> drop request") return try: s['session'].Connect() except dbus.DBusException as e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print(e.get_dbus_message()) return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Disconnect(self, session_name): print("Disconnect session") s = self.find_session(session_name) if s == None or s['session'] == None: print("The session is not running -> drop request") return try: s['session'].Disconnect() except dbus.DBusException as e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print(e.get_dbus_message()) return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Change(self, session_name, key, value): print("Update session settings") s = self.find_session(session_name) if s == None or s['session'] == None: print("The session is not running -> drop request") return try: val = self.type_convert(key, value) s['session'].Change(key, val) except dbus.DBusException as e: if e.get_dbus_name() in ['net.connman.Error.Failed']: print(e.get_dbus_message()) return traceback.print_exc() @dbus.service.method("com.example.TestSession", in_signature='', out_signature='') def Configure(self, session_name, key, value): print("Configure session settings") s = self.find_session(session_name) if s == None: s = {} s['notify_path'] = None s['notify'] = None if not 'settings' in list(s.keys()): s['settings'] = {}; s['session_path'] = None s['session'] = None self.sessions[session_name] = s if s and s['session']: print("The session is running, use change -> drop request") return val = self.type_convert(key, value) s['settings'][key] = val def main(): if len(sys.argv) < 2: print("Usage: %s " % (sys.argv[0])) print("") print(" enable") print(" disable") print(" create ") print(" destroy ") print(" connect ") print(" disconnect ") print(" change ") print(" configure ") print("") print(" run ") sys.exit(1) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) if sys.argv[1] == "enable": bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") manager.SetProperty("SessionMode", True) return elif sys.argv[1] == "disable": bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("net.connman", "/"), "net.connman.Manager") manager.SetProperty("SessionMode", False) return if (len(sys.argv) < 3): print("Need test application path") sys.exit(1) app_path = sys.argv[2] bus = dbus.SessionBus() app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/")) if sys.argv[1] == "run": name = dbus.service.BusName(app_name, bus) mainloop = gobject.MainLoop() app = SessionApplication(bus, app_path, mainloop) mainloop.run() return app = dbus.Interface(bus.get_object(app_name, app_path), "com.example.TestSession") if sys.argv[1] == "create": app.CreateSession(sys.argv[3]) elif sys.argv[1] == "destroy": app.DestroySession(sys.argv[3]) elif sys.argv[1] == "connect": app.Connect(sys.argv[3]) elif sys.argv[1] == "disconnect": app.Disconnect(sys.argv[3]) elif sys.argv[1] == "change": if len(sys.argv) < 5: print("Arguments missing") sys.exit(1) app.Change(sys.argv[3], sys.argv[4], sys.argv[5:]) elif sys.argv[1] == "configure": if len(sys.argv) < 5: print("Arguments missing") sys.exit(1) app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:]) else: print("Unknown command '%s'" % sys.argv[1]) sys.exit(1) if __name__ == '__main__': main()