Release tizen_2.0_beta
[framework/connectivity/connman.git] / test / test-session
1 #!/usr/bin/python
2
3 import sys
4 import gobject
5 import string
6
7 import dbus
8 import dbus.service
9 import dbus.mainloop.glib
10
11 import glib
12
13 import traceback
14
15 def extract_list(list):
16         val = "["
17         for i in list:
18                 val += " " + str(i)
19         val += " ]"
20         return val
21
22 def extract_values(values):
23         val = "{"
24         for key in values.keys():
25                 val += " " + key + "="
26                 if key in ["PrefixLength"]:
27                         val += "%s" % (int(values[key]))
28                 else:
29                         if key in ["Servers", "Excludes"]:
30                                 val += extract_list(values[key])
31                         else:
32                                 val += str(values[key])
33         val += " }"
34         return val
35
36 class Notification(dbus.service.Object):
37         def __init__(self, bus, app, notify_path):
38                 dbus.service.Object.__init__(self)
39                 self.app = app
40
41         @dbus.service.method("net.connman.Notification",
42                                 in_signature='', out_signature='')
43         def Release(self):
44                 print "Release %s" % (self._object_path)
45                 session_name = self._object_path.split('/')[-1]
46                 self.app.release(session_name)
47
48         @dbus.service.method("net.connman.Notification",
49                                 in_signature='a{sv}', out_signature='')
50         def Update(self, settings):
51                 print "Update called at %s" % (self._object_path)
52
53                 try:
54                         for key in settings.keys():
55                                 if key in ["IPv4", "IPv6"]:
56                                         val = extract_values(settings[key])
57                                 elif key in  ["AllowedBearers"]:
58                                         val = extract_list(settings[key])
59                                 else:
60                                         val = settings[key]
61                                 print "    %s = %s" % (key, val)
62                 except:
63                         print "Exception:"
64                         traceback.print_exc()
65
66 class SessionApplication(dbus.service.Object):
67         def __init__(self, bus, object_path, mainloop):
68                 dbus.service.Object.__init__(self, bus, object_path)
69
70                 self.manager = None
71                 self.mainloop = mainloop
72                 self.sessions = {}
73
74                 try:
75                         bus = dbus.SystemBus()
76                         bus.watch_name_owner('net.connman', self.connman_name_owner_changed)
77                 except dbus.DBusException:
78                         traceback.print_exc()
79                         exit(1)
80
81         def connman_name_owner_changed(self, proxy):
82                 try:
83                         if proxy:
84                                 print "connman appeared on D-Bus ", str(proxy)
85
86                                 bus = dbus.SystemBus()
87                                 self.manager = dbus.Interface(bus.get_object("net.connman", "/"),
88                                                               "net.connman.Manager")
89                         else:
90                                 print "connman disappeared on D-Bus"
91                                 self.manager = None
92                                 for s in self.sessions.keys():
93                                         self.sessions[s]['notify'].remove_from_connection()
94                                         self.sessions[s]['notify'] = None
95
96                                 self.sessions = {}
97
98                 except dbus.DBusException:
99                         traceback.print_exc()
100                         exit(1)
101
102         def release(self, session_name):
103                 s = self.find_session(session_name)
104                 if not s:
105                         return
106                 if s['session']:
107                         s['session'].Destroy()
108                         s['session'] = None
109                 if s['notify']:
110                         s['notify'].remove_from_connection()
111                         s['notify'] = None
112                 del self.sessions[session_name]
113
114         def type_convert(self, key, value):
115                 if key in [ "AllowedBearers", "RoamingPolicy" ]:
116                         return value
117                 elif key in [ "Priority", "AvoidHandover",
118                               "StayConnected", "EmergencyCall" ]:
119                         flag = str(value[0]).strip().lower()
120                         val = flag not in ['false', 'f', 'n', '0']
121                         return dbus.Boolean(val)
122                 elif key in [ "PeriodicConnect", "IdleTimeout" ]:
123                         val = value[0]
124                         return dbus.UInt32(val)
125
126                 return value
127
128         def find_session(self, session_name):
129                 if not session_name in self.sessions.keys():
130                         return None
131                 return self.sessions[session_name]
132
133         @dbus.service.method("com.example.TestSession",
134                                 in_signature='', out_signature='')
135         def CreateSession(self, session_name):
136                 print "Create session"
137
138                 s = self.find_session(session_name)
139                 if s and s['session'] :
140                         print "Session %s already created-> drop reqest" % (session_name)
141                         return
142
143                 try:
144                         bus = dbus.SystemBus()
145
146                         if s == None:
147                                 s = {}
148                         s['notify_path'] = self._object_path + "/" + session_name
149                         s['notify'] = Notification(bus, self, s['notify_path'])
150                         s['notify'].add_to_connection(bus, s['notify_path'])
151                         if not 'settings' in s.keys():
152                                 s['settings'] = {};
153                         s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path'])
154                         print "notify path %s" % (s['notify_path'])
155                         print "session path %s" % (s['session_path'])
156                         s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']),
157                                                       "net.connman.Session")
158                         self.sessions[session_name] = s
159
160                 except dbus.DBusException, e:
161                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
162                                 print e.get_dbus_message()
163                                 return
164                         traceback.print_exc()
165                         exit(1)
166
167         @dbus.service.method("com.example.TestSession",
168                                 in_signature='', out_signature='')
169         def DestroySession(self, session_name):
170                 print "Destroy session"
171
172                 s = self.find_session(session_name)
173                 if s == None or s['session'] == None:
174                         print "The session is not running -> drop request"
175                         return
176
177                 try:
178                         self.release(session_name)
179                 except dbus.DBusException:
180                         traceback.print_exc()
181                         exit(1)
182
183         @dbus.service.method("com.example.TestSession",
184                                 in_signature='', out_signature='')
185         def Connect(self, session_name):
186                 print "Connect session"
187
188                 s = self.find_session(session_name)
189                 if s == None or s['session'] == None:
190                         print "The session is not running -> drop request"
191                         return
192
193                 try:
194                         s['session'].Connect()
195                 except dbus.DBusException, e:
196                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
197                                 print e.get_dbus_message()
198                                 return
199                         traceback.print_exc()
200                         exit(1)
201
202         @dbus.service.method("com.example.TestSession",
203                                 in_signature='', out_signature='')
204         def Disconnect(self, session_name):
205                 print "Disconnect session"
206
207                 s = self.find_session(session_name)
208                 if s == None or s['session'] == None:
209                         print "The session is not running -> drop request"
210                         return
211
212                 try:
213                         s['session'].Disconnect()
214                 except dbus.DBusException, e:
215                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
216                                 print e.get_dbus_message()
217                                 return
218                         traceback.print_exc()
219                         exit(1)
220
221         @dbus.service.method("com.example.TestSession",
222                                 in_signature='', out_signature='')
223         def Change(self, session_name, key, value):
224                 print "Update session settings"
225
226                 s = self.find_session(session_name)
227                 if s == None or s['session'] == None:
228                         print "The session is not running -> drop request"
229                         return
230
231                 try:
232                         val = self.type_convert(key, value)
233                         s['session'].Change(key, val)
234                 except dbus.DBusException, e:
235                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
236                                 print e.get_dbus_message()
237                                 return
238                         traceback.print_exc()
239                         exit(1)
240
241         @dbus.service.method("com.example.TestSession",
242                                 in_signature='', out_signature='')
243         def Configure(self, session_name, key, value):
244                 print "Configure session settings"
245                 s = self.find_session(session_name)
246                 if s == None:
247                         s = {}
248                         s['notify_path'] = None
249                         s['notify'] = None
250                         if not 'settings' in s.keys():
251                                 s['settings'] = {};
252                         s['session_path'] = None
253                         s['session'] = None
254                         self.sessions[session_name] = s
255                 if s and s['session']:
256                         print "The session is running, use change -> drop request"
257                         return
258                 val = self.type_convert(key, value)
259                 s['settings'][key] = val
260
261 def main():
262         if len(sys.argv) < 2:
263                 print "Usage: %s <command>" % (sys.argv[0])
264                 print ""
265                 print "  enable"
266                 print "  disable"
267                 print "  create <app_path> <session_name>"
268                 print "  destroy <app_path> <session_name>"
269                 print "  connect <app_path> <session_name>"
270                 print "  disconnect <app_path> <session_name>"
271                 print "  change <app_path> <session_name> <key> <value>"
272                 print "  configure <app_path> <session_name> <key> <value>"
273                 print ""
274                 print "  run <app_path>"
275                 sys.exit(1)
276
277         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
278
279         if sys.argv[1] == "enable":
280                 bus = dbus.SystemBus()
281                 manager = dbus.Interface(bus.get_object("net.connman", "/"),
282                                          "net.connman.Manager")
283                 manager.SetProperty("SessionMode", True)
284                 return
285
286         elif sys.argv[1] == "disable":
287                 bus = dbus.SystemBus()
288                 manager = dbus.Interface(bus.get_object("net.connman", "/"),
289                                          "net.connman.Manager")
290                 manager.SetProperty("SessionMode", False)
291                 return
292
293         if (len(sys.argv) < 3):
294                 print "Need test application path"
295                 sys.exit(1)
296
297         app_path = sys.argv[2]
298         bus = dbus.SessionBus()
299
300         app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/"))
301
302         if sys.argv[1] == "run":
303                 name = dbus.service.BusName(app_name, bus)
304                 mainloop = gobject.MainLoop()
305
306                 app = SessionApplication(bus, app_path, mainloop)
307
308                 mainloop.run()
309                 return
310
311         app = dbus.Interface(bus.get_object(app_name, app_path),
312                              "com.example.TestSession")
313
314         if sys.argv[1] == "create":
315                 app.CreateSession(sys.argv[3])
316
317         elif sys.argv[1] == "destroy":
318                 app.DestroySession(sys.argv[3])
319
320         elif sys.argv[1] == "connect":
321                 app.Connect(sys.argv[3])
322
323         elif sys.argv[1] == "disconnect":
324                 app.Disconnect(sys.argv[3])
325
326         elif sys.argv[1] == "change":
327                 if len(sys.argv) < 5:
328                         print "Arguments missing"
329                         sys.exit(1)
330
331                 app.Change(sys.argv[3], sys.argv[4], sys.argv[5:])
332
333         elif sys.argv[1] == "configure":
334                 if len(sys.argv) < 5:
335                         print "Arguments missing"
336                         sys.exit(1)
337
338                 app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:])
339
340         else:
341                 print "Unknown command '%s'" % sys.argv[1]
342                 sys.exit(1)
343
344 if __name__ == '__main__':
345         main()