9 import dbus.mainloop.glib
15 def extract_list(list):
22 def extract_values(values):
24 for key in list(values.keys()):
25 val += " " + key + "="
26 if key in ["PrefixLength"]:
27 val += "%s" % (int(values[key]))
29 if key in ["Servers", "Excludes"]:
30 val += extract_list(values[key])
32 val += str(values[key])
36 class Notification(dbus.service.Object):
37 def __init__(self, bus, app, notify_path):
38 dbus.service.Object.__init__(self)
41 @dbus.service.method("net.connman.Notification",
42 in_signature='', out_signature='')
44 print("Release %s" % (self._object_path))
45 session_name = self._object_path.split('/')[-1]
46 self.app.release(session_name)
48 @dbus.service.method("net.connman.Notification",
49 in_signature='a{sv}', out_signature='')
50 def Update(self, settings):
51 print("Update called at %s" % (self._object_path))
54 for key in list(settings.keys()):
55 if key in ["IPv4", "IPv6"]:
56 val = extract_values(settings[key])
57 elif key in ["AllowedBearers"]:
58 val = extract_list(settings[key])
61 print(" %s = %s" % (key, val))
66 class SessionApplication(dbus.service.Object):
67 def __init__(self, bus, object_path, mainloop):
68 dbus.service.Object.__init__(self, bus, object_path)
71 self.mainloop = mainloop
75 bus = dbus.SystemBus()
76 bus.watch_name_owner('net.connman', self.connman_name_owner_changed)
77 except dbus.DBusException:
80 def connman_name_owner_changed(self, proxy):
83 print("connman appeared on D-Bus ", str(proxy))
85 bus = dbus.SystemBus()
86 self.manager = dbus.Interface(bus.get_object("net.connman", "/"),
87 "net.connman.Manager")
89 print("connman disappeared on D-Bus")
91 for s in list(self.sessions.keys()):
92 self.sessions[s]['notify'].remove_from_connection()
93 self.sessions[s]['notify'] = None
97 except dbus.DBusException:
100 def release(self, session_name):
101 s = self.find_session(session_name)
105 s['session'].Destroy()
108 s['notify'].remove_from_connection()
110 del self.sessions[session_name]
112 def type_convert(self, key, value):
113 if key in [ "AllowedBearers" ]:
115 elif key in [ "RoamingPolicy", "ConnectionType" ]:
118 elif key in [ "Priority", "AvoidHandover",
119 "StayConnected", "EmergencyCall" ]:
120 flag = str(value[0]).strip().lower()
121 val = flag not in ['false', 'f', 'n', '0']
122 return dbus.Boolean(val)
123 elif key in [ "PeriodicConnect", "IdleTimeout" ]:
125 return dbus.UInt32(val)
129 def find_session(self, session_name):
130 if not session_name in list(self.sessions.keys()):
132 return self.sessions[session_name]
134 @dbus.service.method("com.example.TestSession",
135 in_signature='', out_signature='')
136 def CreateSession(self, session_name):
137 print("Create session")
139 s = self.find_session(session_name)
140 if s and s['session'] :
141 print("Session %s already created-> drop request" % (session_name))
145 bus = dbus.SystemBus()
149 s['notify_path'] = self._object_path + "/" + session_name
150 s['notify'] = Notification(bus, self, s['notify_path'])
151 s['notify'].add_to_connection(bus, s['notify_path'])
152 if not 'settings' in list(s.keys()):
154 s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path'])
155 print("notify path %s" % (s['notify_path']))
156 print("session path %s" % (s['session_path']))
157 s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']),
158 "net.connman.Session")
159 self.sessions[session_name] = s
161 except dbus.DBusException as e:
162 if e.get_dbus_name() in ['net.connman.Error.Failed']:
163 print(e.get_dbus_message())
165 traceback.print_exc()
167 @dbus.service.method("com.example.TestSession",
168 in_signature='', out_signature='')
169 def DestroySession(self, session_name):
170 print("Destroy session")
172 s = self.find_session(session_name)
173 if s == None or s['session'] == None:
174 print("The session is not running -> drop request")
178 self.release(session_name)
179 except dbus.DBusException:
180 traceback.print_exc()
182 @dbus.service.method("com.example.TestSession",
183 in_signature='', out_signature='')
184 def Connect(self, session_name):
185 print("Connect session")
187 s = self.find_session(session_name)
188 if s == None or s['session'] == None:
189 print("The session is not running -> drop request")
193 s['session'].Connect()
194 except dbus.DBusException as e:
195 if e.get_dbus_name() in ['net.connman.Error.Failed']:
196 print(e.get_dbus_message())
198 traceback.print_exc()
200 @dbus.service.method("com.example.TestSession",
201 in_signature='', out_signature='')
202 def Disconnect(self, session_name):
203 print("Disconnect session")
205 s = self.find_session(session_name)
206 if s == None or s['session'] == None:
207 print("The session is not running -> drop request")
211 s['session'].Disconnect()
212 except dbus.DBusException as e:
213 if e.get_dbus_name() in ['net.connman.Error.Failed']:
214 print(e.get_dbus_message())
216 traceback.print_exc()
218 @dbus.service.method("com.example.TestSession",
219 in_signature='', out_signature='')
220 def Change(self, session_name, key, value):
221 print("Update session settings")
223 s = self.find_session(session_name)
224 if s == None or s['session'] == None:
225 print("The session is not running -> drop request")
229 val = self.type_convert(key, value)
230 s['session'].Change(key, val)
231 except dbus.DBusException as e:
232 if e.get_dbus_name() in ['net.connman.Error.Failed']:
233 print(e.get_dbus_message())
235 traceback.print_exc()
237 @dbus.service.method("com.example.TestSession",
238 in_signature='', out_signature='')
239 def Configure(self, session_name, key, value):
240 print("Configure session settings")
241 s = self.find_session(session_name)
244 s['notify_path'] = None
246 if not 'settings' in list(s.keys()):
248 s['session_path'] = None
250 self.sessions[session_name] = s
251 if s and s['session']:
252 print("The session is running, use change -> drop request")
254 val = self.type_convert(key, value)
255 s['settings'][key] = val
258 if len(sys.argv) < 2:
259 print("Usage: %s <command>" % (sys.argv[0]))
263 print(" create <app_path> <session_name>")
264 print(" destroy <app_path> <session_name>")
265 print(" connect <app_path> <session_name>")
266 print(" disconnect <app_path> <session_name>")
267 print(" change <app_path> <session_name> <key> <value>")
268 print(" configure <app_path> <session_name> <key> <value>")
270 print(" run <app_path>")
273 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
275 if sys.argv[1] == "enable":
276 bus = dbus.SystemBus()
277 manager = dbus.Interface(bus.get_object("net.connman", "/"),
278 "net.connman.Manager")
279 manager.SetProperty("SessionMode", True)
282 elif sys.argv[1] == "disable":
283 bus = dbus.SystemBus()
284 manager = dbus.Interface(bus.get_object("net.connman", "/"),
285 "net.connman.Manager")
286 manager.SetProperty("SessionMode", False)
289 if (len(sys.argv) < 3):
290 print("Need test application path")
293 app_path = sys.argv[2]
294 bus = dbus.SessionBus()
296 app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/"))
298 if sys.argv[1] == "run":
299 name = dbus.service.BusName(app_name, bus)
300 mainloop = gobject.MainLoop()
302 app = SessionApplication(bus, app_path, mainloop)
307 app = dbus.Interface(bus.get_object(app_name, app_path),
308 "com.example.TestSession")
310 if sys.argv[1] == "create":
311 app.CreateSession(sys.argv[3])
313 elif sys.argv[1] == "destroy":
314 app.DestroySession(sys.argv[3])
316 elif sys.argv[1] == "connect":
317 app.Connect(sys.argv[3])
319 elif sys.argv[1] == "disconnect":
320 app.Disconnect(sys.argv[3])
322 elif sys.argv[1] == "change":
323 if len(sys.argv) < 5:
324 print("Arguments missing")
327 app.Change(sys.argv[3], sys.argv[4], sys.argv[5:])
329 elif sys.argv[1] == "configure":
330 if len(sys.argv) < 5:
331 print("Arguments missing")
334 app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:])
337 print("Unknown command '%s'" % sys.argv[1])
340 if __name__ == '__main__':