dnsproxy: Fix constiness of variables
[framework/connectivity/connman.git] / test / test-session
1 #!/usr/bin/python
2
3 import sys
4 import gobject
5 import string
6
7 import dbus
8 import dbus.service
9 import dbus.mainloop.glib
10
11 import glib
12
13 import traceback
14
15 def extract_list(list):
16         val = "["
17         for i in list:
18                 val += " " + str(i)
19         val += " ]"
20         return val
21
22 def extract_values(values):
23         val = "{"
24         for key in values.keys():
25                 val += " " + key + "="
26                 if key in ["PrefixLength"]:
27                         val += "%s" % (int(values[key]))
28                 else:
29                         if key in ["Servers", "Excludes"]:
30                                 val += extract_list(values[key])
31                         else:
32                                 val += str(values[key])
33         val += " }"
34         return val
35
36 class Notification(dbus.service.Object):
37         def __init__(self, bus, app, notify_path):
38                 dbus.service.Object.__init__(self)
39                 self.app = app
40
41         @dbus.service.method("net.connman.Notification",
42                                 in_signature='', out_signature='')
43         def Release(self):
44                 print "Release %s" % (self._object_path)
45                 session_name = self._object_path.split('/')[-1]
46                 self.app.release(session_name)
47
48         @dbus.service.method("net.connman.Notification",
49                                 in_signature='a{sv}', out_signature='')
50         def Update(self, settings):
51                 print "Update called at %s" % (self._object_path)
52
53                 try:
54                         for key in settings.keys():
55                                 if key in ["IPv4", "IPv6"]:
56                                         val = extract_values(settings[key])
57                                 elif key in  ["AllowedBearers"]:
58                                         val = extract_list(settings[key])
59                                 else:
60                                         val = settings[key]
61                                 print "    %s = %s" % (key, val)
62                 except:
63                         print "Exception:"
64                         traceback.print_exc()
65
66 class SessionApplication(dbus.service.Object):
67         def __init__(self, bus, object_path, mainloop):
68                 dbus.service.Object.__init__(self, bus, object_path)
69
70                 self.manager = None
71                 self.mainloop = mainloop
72                 self.sessions = {}
73
74                 try:
75                         bus = dbus.SystemBus()
76                         bus.watch_name_owner('net.connman', self.connman_name_owner_changed)
77                 except dbus.DBusException:
78                         traceback.print_exc()
79
80         def connman_name_owner_changed(self, proxy):
81                 try:
82                         if proxy:
83                                 print "connman appeared on D-Bus ", str(proxy)
84
85                                 bus = dbus.SystemBus()
86                                 self.manager = dbus.Interface(bus.get_object("net.connman", "/"),
87                                                               "net.connman.Manager")
88                         else:
89                                 print "connman disappeared on D-Bus"
90                                 self.manager = None
91                                 for s in self.sessions.keys():
92                                         self.sessions[s]['notify'].remove_from_connection()
93                                         self.sessions[s]['notify'] = None
94
95                                 self.sessions = {}
96
97                 except dbus.DBusException:
98                         traceback.print_exc()
99
100         def release(self, session_name):
101                 s = self.find_session(session_name)
102                 if not s:
103                         return
104                 if s['session']:
105                         s['session'].Destroy()
106                         s['session'] = None
107                 if s['notify']:
108                         s['notify'].remove_from_connection()
109                         s['notify'] = None
110                 del self.sessions[session_name]
111
112         def type_convert(self, key, value):
113                 if key in [ "AllowedBearers" ]:
114                         return value
115                 elif key in [ "RoamingPolicy", "ConnectionType" ]:
116                         if len(value) > 0:
117                                 return value[0]
118                 elif key in [ "Priority", "AvoidHandover",
119                               "StayConnected", "EmergencyCall" ]:
120                         flag = str(value[0]).strip().lower()
121                         val = flag not in ['false', 'f', 'n', '0']
122                         return dbus.Boolean(val)
123                 elif key in [ "PeriodicConnect", "IdleTimeout" ]:
124                         val = value[0]
125                         return dbus.UInt32(val)
126
127                 return value
128
129         def find_session(self, session_name):
130                 if not session_name in self.sessions.keys():
131                         return None
132                 return self.sessions[session_name]
133
134         @dbus.service.method("com.example.TestSession",
135                                 in_signature='', out_signature='')
136         def CreateSession(self, session_name):
137                 print "Create session"
138
139                 s = self.find_session(session_name)
140                 if s and s['session'] :
141                         print "Session %s already created-> drop reqest" % (session_name)
142                         return
143
144                 try:
145                         bus = dbus.SystemBus()
146
147                         if s == None:
148                                 s = {}
149                         s['notify_path'] = self._object_path + "/" + session_name
150                         s['notify'] = Notification(bus, self, s['notify_path'])
151                         s['notify'].add_to_connection(bus, s['notify_path'])
152                         if not 'settings' in s.keys():
153                                 s['settings'] = {};
154                         s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path'])
155                         print "notify path %s" % (s['notify_path'])
156                         print "session path %s" % (s['session_path'])
157                         s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']),
158                                                       "net.connman.Session")
159                         self.sessions[session_name] = s
160
161                 except dbus.DBusException, e:
162                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
163                                 print e.get_dbus_message()
164                                 return
165                         traceback.print_exc()
166
167         @dbus.service.method("com.example.TestSession",
168                                 in_signature='', out_signature='')
169         def DestroySession(self, session_name):
170                 print "Destroy session"
171
172                 s = self.find_session(session_name)
173                 if s == None or s['session'] == None:
174                         print "The session is not running -> drop request"
175                         return
176
177                 try:
178                         self.release(session_name)
179                 except dbus.DBusException:
180                         traceback.print_exc()
181
182         @dbus.service.method("com.example.TestSession",
183                                 in_signature='', out_signature='')
184         def Connect(self, session_name):
185                 print "Connect session"
186
187                 s = self.find_session(session_name)
188                 if s == None or s['session'] == None:
189                         print "The session is not running -> drop request"
190                         return
191
192                 try:
193                         s['session'].Connect()
194                 except dbus.DBusException, e:
195                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
196                                 print e.get_dbus_message()
197                                 return
198                         traceback.print_exc()
199
200         @dbus.service.method("com.example.TestSession",
201                                 in_signature='', out_signature='')
202         def Disconnect(self, session_name):
203                 print "Disconnect session"
204
205                 s = self.find_session(session_name)
206                 if s == None or s['session'] == None:
207                         print "The session is not running -> drop request"
208                         return
209
210                 try:
211                         s['session'].Disconnect()
212                 except dbus.DBusException, e:
213                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
214                                 print e.get_dbus_message()
215                                 return
216                         traceback.print_exc()
217
218         @dbus.service.method("com.example.TestSession",
219                                 in_signature='', out_signature='')
220         def Change(self, session_name, key, value):
221                 print "Update session settings"
222
223                 s = self.find_session(session_name)
224                 if s == None or s['session'] == None:
225                         print "The session is not running -> drop request"
226                         return
227
228                 try:
229                         val = self.type_convert(key, value)
230                         s['session'].Change(key, val)
231                 except dbus.DBusException, e:
232                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
233                                 print e.get_dbus_message()
234                                 return
235                         traceback.print_exc()
236
237         @dbus.service.method("com.example.TestSession",
238                                 in_signature='', out_signature='')
239         def Configure(self, session_name, key, value):
240                 print "Configure session settings"
241                 s = self.find_session(session_name)
242                 if s == None:
243                         s = {}
244                         s['notify_path'] = None
245                         s['notify'] = None
246                         if not 'settings' in s.keys():
247                                 s['settings'] = {};
248                         s['session_path'] = None
249                         s['session'] = None
250                         self.sessions[session_name] = s
251                 if s and s['session']:
252                         print "The session is running, use change -> drop request"
253                         return
254                 val = self.type_convert(key, value)
255                 s['settings'][key] = val
256
257 def main():
258         if len(sys.argv) < 2:
259                 print "Usage: %s <command>" % (sys.argv[0])
260                 print ""
261                 print "  enable"
262                 print "  disable"
263                 print "  create <app_path> <session_name>"
264                 print "  destroy <app_path> <session_name>"
265                 print "  connect <app_path> <session_name>"
266                 print "  disconnect <app_path> <session_name>"
267                 print "  change <app_path> <session_name> <key> <value>"
268                 print "  configure <app_path> <session_name> <key> <value>"
269                 print ""
270                 print "  run <app_path>"
271                 sys.exit(1)
272
273         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
274
275         if sys.argv[1] == "enable":
276                 bus = dbus.SystemBus()
277                 manager = dbus.Interface(bus.get_object("net.connman", "/"),
278                                          "net.connman.Manager")
279                 manager.SetProperty("SessionMode", True)
280                 return
281
282         elif sys.argv[1] == "disable":
283                 bus = dbus.SystemBus()
284                 manager = dbus.Interface(bus.get_object("net.connman", "/"),
285                                          "net.connman.Manager")
286                 manager.SetProperty("SessionMode", False)
287                 return
288
289         if (len(sys.argv) < 3):
290                 print "Need test application path"
291                 sys.exit(1)
292
293         app_path = sys.argv[2]
294         bus = dbus.SessionBus()
295
296         app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/"))
297
298         if sys.argv[1] == "run":
299                 name = dbus.service.BusName(app_name, bus)
300                 mainloop = gobject.MainLoop()
301
302                 app = SessionApplication(bus, app_path, mainloop)
303
304                 mainloop.run()
305                 return
306
307         app = dbus.Interface(bus.get_object(app_name, app_path),
308                              "com.example.TestSession")
309
310         if sys.argv[1] == "create":
311                 app.CreateSession(sys.argv[3])
312
313         elif sys.argv[1] == "destroy":
314                 app.DestroySession(sys.argv[3])
315
316         elif sys.argv[1] == "connect":
317                 app.Connect(sys.argv[3])
318
319         elif sys.argv[1] == "disconnect":
320                 app.Disconnect(sys.argv[3])
321
322         elif sys.argv[1] == "change":
323                 if len(sys.argv) < 5:
324                         print "Arguments missing"
325                         sys.exit(1)
326
327                 app.Change(sys.argv[3], sys.argv[4], sys.argv[5:])
328
329         elif sys.argv[1] == "configure":
330                 if len(sys.argv) < 5:
331                         print "Arguments missing"
332                         sys.exit(1)
333
334                 app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:])
335
336         else:
337                 print "Unknown command '%s'" % sys.argv[1]
338                 sys.exit(1)
339
340 if __name__ == '__main__':
341         main()