#!/usr/bin/python from gi.repository import GLib import dbus import dbus.service import dbus.mainloop.glib import sys class Canceled(dbus.DBusException): _dbus_error_name = "net.connman.Error.Canceled" class LaunchBrowser(dbus.DBusException): _dbus_error_name = "net.connman.Agent.Error.LaunchBrowser" class Agent(dbus.service.Object): name = None ssid = None identity = None passphrase = None wpspin = None username = None password = None @dbus.service.method("net.connman.Agent", in_signature='', out_signature='') def Release(self): print("Release") mainloop.quit() def input_passphrase(self): response = {} if not self.identity and not self.passphrase and not self.wpspin: print("Service credentials requested, type cancel to cancel") args = input('Answer: ') for arg in args.split(): if arg.startswith("cancel"): response["Error"] = arg if arg.startswith("Identity="): identity = arg.replace("Identity=", "", 1) response["Identity"] = identity if arg.startswith("Passphrase="): passphrase = arg.replace("Passphrase=", "", 1) response["Passphrase"] = passphrase if arg.startswith("WPS="): wpspin = arg.replace("WPS=", "", 1) response["WPS"] = wpspin break else: if self.identity: response["Identity"] = self.identity if self.passphrase: response["Passphrase"] = self.passphrase if self.wpspin: response["WPS"] = self.wpspin return response def input_username(self): response = {} if not self.username and not self.password: print("User login requested, type cancel to cancel") print("or browser to login through the browser by yourself.") args = input('Answer: ') for arg in args.split(): if arg.startswith("cancel") or arg.startswith("browser"): response["Error"] = arg if arg.startswith("Username="): username = arg.replace("Username=", "", 1) response["Username"] = username if arg.startswith("Password="): password = arg.replace("Password=", "", 1) response["Password"] = password else: if self.username: response["Username"] = self.username if self.password: response["Password"] = self.password return response def input_hidden(self): response = {} if not self.name and not self.ssid: args = input('Answer ') for arg in args.split(): if arg.startswith("Name="): name = arg.replace("Name=", "", 1) response["Name"] = name break if arg.startswith("SSID="): ssid = arg.replace("SSID", "", 1) response["SSID"] = ssid break else: if self.name: response["Name"] = self.name if self.ssid: response["SSID"] = self.ssid return response @dbus.service.method("net.connman.Agent", in_signature='oa{sv}', out_signature='a{sv}') def RequestInput(self, path, fields): print("RequestInput (%s,%s)" % (path, fields)) response = {} if "Name" in fields: response.update(self.input_hidden()) if "Passphrase" in fields: response.update(self.input_passphrase()) if "Username" in fields: response.update(self.input_username()) if "Error" in response: if response["Error"] == "cancel": raise Canceled("canceled") return if response["Error"] == "browser": raise LaunchBrowser("launch browser") return print("returning (%s)" % (response)) return response @dbus.service.method("net.connman.Agent", in_signature='os', out_signature='') def RequestBrowser(self, path, url): print("RequestBrowser (%s,%s)" % (path, url)) print("Please login through the given url in a browser") print("Then press enter to accept or some text to cancel") args = input('> ') if len(args) > 0: raise Canceled("canceled") return @dbus.service.method("net.connman.Agent", in_signature='os', out_signature='') def ReportError(self, path, error): print("ReportError %s, %s" % (path, error)) retry = input("Retry service (yes/no): ") if (retry == "yes"): class Retry(dbus.DBusException): _dbus_error_name = "net.connman.Agent.Error.Retry" raise Retry("retry service") else: return @dbus.service.method("net.connman.Agent", in_signature='', out_signature='') def Cancel(self): print("Cancel") class VpnAgent(dbus.service.Object): name = None host = None cookie = None username = None password = None @dbus.service.method("net.connman.vpn.Agent", in_signature='', out_signature='') def Release(self): print("Release VPN agent") def input_cookie(self): response = {} if not self.cookie: print("VPN credentials requested, type cancel to cancel") args = input('Answer: ') for arg in args.split(): if arg.startswith("cancel"): response["Error"] = arg if arg.startswith("Cookie="): cookie = arg.replace("Cookie=", "", 1) response["OpenConnect.Cookie"] = cookie else: if self.cookie: response["OpenConnect.Cookie"] = self.cookie return response def input_username(self): response = {} if not self.username and not self.password: print("User login requested, type cancel to cancel") args = input('Answer: ') for arg in args.split(): if arg.startswith("cancel"): response["Error"] = arg if arg.startswith("Username="): username = arg.replace("Username=", "", 1) response["Username"] = username if arg.startswith("Password="): password = arg.replace("Password=", "", 1) response["Password"] = password else: if self.username: response["Username"] = self.username if self.password: response["Password"] = self.password return response @dbus.service.method("net.connman.vpn.Agent", in_signature='oa{sv}', out_signature='a{sv}') def RequestInput(self, path, fields): print("RequestInput (%s,%s)" % (path, fields)) response = {} if "OpenConnect.Cookie" in fields: response.update(self.input_cookie()) if "Username" in fields or "Password" in fields: response.update(self.input_username()) if "Error" in response: if response["Error"] == "cancel": raise Canceled("canceled") return print("returning (%s)" % (response)) return response @dbus.service.method("net.connman.vpn.Agent", in_signature='os', out_signature='') def ReportError(self, path, error): print("ReportError %s, %s" % (path, error)) retry = input("Retry service (yes/no): ") if (retry == "yes"): class Retry(dbus.DBusException): _dbus_error_name = "net.connman.vpn.Agent.Error.Retry" raise Retry("retry service") else: return @dbus.service.method("net.connman.vpn.Agent", in_signature='', out_signature='') def Cancel(self): print("Cancel") def vpnNameOwnerChanged(proxy): if proxy: print("vpnd is connected to system bus") try: path = "/test/vpn_agent" vpn_manager = dbus.Interface(bus.get_object('net.connman.vpn', "/"), 'net.connman.vpn.Manager') vpn_manager.RegisterAgent(path) except: print("vpn agent is not registered") else: print("vpnd is disconnected from system bus") vpn_manager = None def print_usage(): print("Usage:") print("For hidden service:") print("%s Name= [SSID=]" % (sys.argv[0])) print("For EAP/WPA input:") print("%s Identity= Passphrase= WPS=" % (sys.argv[0])) print("For WISPr login, L2TP or PPTP input:") print("%s Username= Password=" % (sys.argv[0])) print("For OpenConnect input:") print("%s Cookie=" % (sys.argv[0])) print("Help: %s help" % (sys.argv[0])) sys.exit(1) if __name__ == '__main__': if len(sys.argv) == 2 and sys.argv[1] == "help": print_usage() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object('net.connman', "/"), 'net.connman.Manager') path = "/test/agent" object = Agent(bus, path) try: vpn_manager = dbus.Interface(bus.get_object('net.connman.vpn', "/"), 'net.connman.vpn.Manager') vpn_path = "/test/vpn_agent" vpn_object = VpnAgent(bus, vpn_path) except: vpn_manager = None print("net.connman.vpn is not present") if len(sys.argv) >= 2: for arg in sys.argv[1:]: if arg.startswith("Name="): object.name = arg.replace("Name=", "", 1) elif arg.startswith("SSID="): object.ssid = arg.replace("SSID=", "", 1) elif arg.startswith("Identity="): object.identity = arg.replace("Identity=", "", 1) elif arg.startswith("Passphrase="): object.passphrase = arg.replace("Passphrase=", "", 1) elif arg.startswith("WPS="): object.wpspin = arg.replace("WPS=", "", 1) elif arg.startswith("Username="): object.username = arg.replace("Username=", "", 1) vpn_object.username = arg.replace("Username=", "", 1) elif arg.startswith("Password="): object.password = arg.replace("Password=", "", 1) vpn_object.password = arg.replace("Password=", "", 1) elif arg.startswith("Cookie="): vpn_object.cookie = arg.replace("Cookie=", "", 1) else: print_usage() try: manager.RegisterAgent(path) except: print("Cannot register connman agent.") if vpn_manager != None: try: vpn_manager.RegisterAgent(vpn_path) bus.watch_name_owner('net.connman.vpn', vpnNameOwnerChanged) except: "Cannot register vpn agent" mainloop = GLib.MainLoop() mainloop.run() #manager.UnregisterAgent(path)