3 from gi.repository import GLib
9 import dbus.mainloop.glib
13 def extract_list(list):
20 def extract_values(values):
22 for key in list(values.keys()):
23 val += " " + key + "="
24 if key in ["PrefixLength"]:
25 val += "%s" % (int(values[key]))
27 if key in ["Servers", "Excludes"]:
28 val += extract_list(values[key])
30 val += str(values[key])
34 class Notification(dbus.service.Object):
35 def __init__(self, bus, app, notify_path):
36 dbus.service.Object.__init__(self)
39 @dbus.service.method("net.connman.Notification",
40 in_signature='', out_signature='')
42 print("Release %s" % (self._object_path))
43 session_name = self._object_path.split('/')[-1]
44 self.app.release(session_name)
46 @dbus.service.method("net.connman.Notification",
47 in_signature='a{sv}', out_signature='')
48 def Update(self, settings):
49 print("Update called at %s" % (self._object_path))
52 for key in list(settings.keys()):
53 if key in ["IPv4", "IPv6"]:
54 val = extract_values(settings[key])
55 elif key in ["AllowedBearers"]:
56 val = extract_list(settings[key])
59 print(" %s = %s" % (key, val))
64 class SessionApplication(dbus.service.Object):
65 def __init__(self, bus, object_path, mainloop):
66 dbus.service.Object.__init__(self, bus, object_path)
69 self.mainloop = mainloop
73 bus = dbus.SystemBus()
74 bus.watch_name_owner('net.connman', self.connman_name_owner_changed)
75 except dbus.DBusException:
78 def connman_name_owner_changed(self, proxy):
81 print("connman appeared on D-Bus ", str(proxy))
83 bus = dbus.SystemBus()
84 self.manager = dbus.Interface(bus.get_object("net.connman", "/"),
85 "net.connman.Manager")
87 print("connman disappeared on D-Bus")
89 for s in list(self.sessions.keys()):
90 self.sessions[s]['notify'].remove_from_connection()
91 self.sessions[s]['notify'] = None
95 except dbus.DBusException:
98 def release(self, session_name):
99 s = self.find_session(session_name)
103 s['session'].Destroy()
106 s['notify'].remove_from_connection()
108 del self.sessions[session_name]
110 def type_convert(self, key, value):
111 if key in [ "AllowedBearers" ]:
113 elif key in [ "RoamingPolicy", "ConnectionType" ]:
116 elif key in [ "Priority", "AvoidHandover",
117 "StayConnected", "EmergencyCall" ]:
118 flag = str(value[0]).strip().lower()
119 val = flag not in ['false', 'f', 'n', '0']
120 return dbus.Boolean(val)
121 elif key in [ "PeriodicConnect", "IdleTimeout" ]:
123 return dbus.UInt32(val)
127 def find_session(self, session_name):
128 if not session_name in list(self.sessions.keys()):
130 return self.sessions[session_name]
132 @dbus.service.method("com.example.TestSession",
133 in_signature='', out_signature='')
134 def CreateSession(self, session_name):
135 print("Create session")
137 s = self.find_session(session_name)
138 if s and s['session'] :
139 print("Session %s already created-> drop request" % (session_name))
143 bus = dbus.SystemBus()
147 s['notify_path'] = self._object_path + "/" + session_name
148 s['notify'] = Notification(bus, self, s['notify_path'])
149 s['notify'].add_to_connection(bus, s['notify_path'])
150 if not 'settings' in list(s.keys()):
152 s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path'])
153 print("notify path %s" % (s['notify_path']))
154 print("session path %s" % (s['session_path']))
155 s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']),
156 "net.connman.Session")
157 self.sessions[session_name] = s
159 except dbus.DBusException as e:
160 if e.get_dbus_name() in ['net.connman.Error.Failed']:
161 print(e.get_dbus_message())
163 traceback.print_exc()
165 @dbus.service.method("com.example.TestSession",
166 in_signature='', out_signature='')
167 def DestroySession(self, session_name):
168 print("Destroy session")
170 s = self.find_session(session_name)
171 if s == None or s['session'] == None:
172 print("The session is not running -> drop request")
176 self.release(session_name)
177 except dbus.DBusException:
178 traceback.print_exc()
180 @dbus.service.method("com.example.TestSession",
181 in_signature='', out_signature='')
182 def Connect(self, session_name):
183 print("Connect session")
185 s = self.find_session(session_name)
186 if s == None or s['session'] == None:
187 print("The session is not running -> drop request")
191 s['session'].Connect()
192 except dbus.DBusException as e:
193 if e.get_dbus_name() in ['net.connman.Error.Failed']:
194 print(e.get_dbus_message())
196 traceback.print_exc()
198 @dbus.service.method("com.example.TestSession",
199 in_signature='', out_signature='')
200 def Disconnect(self, session_name):
201 print("Disconnect session")
203 s = self.find_session(session_name)
204 if s == None or s['session'] == None:
205 print("The session is not running -> drop request")
209 s['session'].Disconnect()
210 except dbus.DBusException as e:
211 if e.get_dbus_name() in ['net.connman.Error.Failed']:
212 print(e.get_dbus_message())
214 traceback.print_exc()
216 @dbus.service.method("com.example.TestSession",
217 in_signature='', out_signature='')
218 def Change(self, session_name, key, value):
219 print("Update session settings")
221 s = self.find_session(session_name)
222 if s == None or s['session'] == None:
223 print("The session is not running -> drop request")
227 val = self.type_convert(key, value)
228 s['session'].Change(key, val)
229 except dbus.DBusException as e:
230 if e.get_dbus_name() in ['net.connman.Error.Failed']:
231 print(e.get_dbus_message())
233 traceback.print_exc()
235 @dbus.service.method("com.example.TestSession",
236 in_signature='', out_signature='')
237 def Configure(self, session_name, key, value):
238 print("Configure session settings")
239 s = self.find_session(session_name)
242 s['notify_path'] = None
244 if not 'settings' in list(s.keys()):
246 s['session_path'] = None
248 self.sessions[session_name] = s
249 if s and s['session']:
250 print("The session is running, use change -> drop request")
252 val = self.type_convert(key, value)
253 s['settings'][key] = val
256 if len(sys.argv) < 2:
257 print("Usage: %s <command>" % (sys.argv[0]))
261 print(" create <app_path> <session_name>")
262 print(" destroy <app_path> <session_name>")
263 print(" connect <app_path> <session_name>")
264 print(" disconnect <app_path> <session_name>")
265 print(" change <app_path> <session_name> <key> <value>")
266 print(" configure <app_path> <session_name> <key> <value>")
268 print(" run <app_path>")
271 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
273 if sys.argv[1] == "enable":
274 bus = dbus.SystemBus()
275 manager = dbus.Interface(bus.get_object("net.connman", "/"),
276 "net.connman.Manager")
277 manager.SetProperty("SessionMode", True)
280 elif sys.argv[1] == "disable":
281 bus = dbus.SystemBus()
282 manager = dbus.Interface(bus.get_object("net.connman", "/"),
283 "net.connman.Manager")
284 manager.SetProperty("SessionMode", False)
287 if (len(sys.argv) < 3):
288 print("Need test application path")
291 app_path = sys.argv[2]
292 bus = dbus.SessionBus()
294 app_name = "com.example.SessionApplication.%s" % (str.strip(app_path, "/"))
296 if sys.argv[1] == "run":
297 name = dbus.service.BusName(app_name, bus)
298 mainloop = GLib.MainLoop()
300 app = SessionApplication(bus, app_path, mainloop)
305 app = dbus.Interface(bus.get_object(app_name, app_path),
306 "com.example.TestSession")
308 if sys.argv[1] == "create":
309 app.CreateSession(sys.argv[3])
311 elif sys.argv[1] == "destroy":
312 app.DestroySession(sys.argv[3])
314 elif sys.argv[1] == "connect":
315 app.Connect(sys.argv[3])
317 elif sys.argv[1] == "disconnect":
318 app.Disconnect(sys.argv[3])
320 elif sys.argv[1] == "change":
321 if len(sys.argv) < 5:
322 print("Arguments missing")
325 app.Change(sys.argv[3], sys.argv[4], sys.argv[5:])
327 elif sys.argv[1] == "configure":
328 if len(sys.argv) < 5:
329 print("Arguments missing")
332 app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:])
335 print("Unknown command '%s'" % sys.argv[1])
338 if __name__ == '__main__':