test: Add support of hidden service input into simple-agent
[framework/connectivity/connman.git] / test / test-session
1 #!/usr/bin/python
2
3 import sys
4 import gobject
5 import string
6
7 import dbus
8 import dbus.service
9 import dbus.mainloop.glib
10
11 import glib
12
13 import traceback
14
15 def extract_list(list):
16         val = "["
17         for i in list:
18                 val += " " + str(i)
19         val += " ]"
20         return val
21
22 def extract_values(values):
23         val = "{"
24         for key in values.keys():
25                 val += " " + key + "="
26                 if key in ["PrefixLength"]:
27                         val += "%s" % (int(values[key]))
28                 else:
29                         if key in ["Servers", "Excludes"]:
30                                 val += extract_list(values[key])
31                         else:
32                                 val += str(values[key])
33         val += " }"
34         return val
35
36 class Notification(dbus.service.Object):
37         def __init__(self, bus, app, notify_path):
38                 dbus.service.Object.__init__(self)
39                 self.app = app
40
41         @dbus.service.method("net.connman.Notification",
42                                 in_signature='', out_signature='')
43         def Release(self):
44                 print "Release %s" % (self._object_path)
45                 session_name = self._object_path.split('/')[-1]
46                 self.app.release(session_name)
47
48         @dbus.service.method("net.connman.Notification",
49                                 in_signature='a{sv}', out_signature='')
50         def Update(self, settings):
51                 print "Update called at %s" % (self._object_path)
52
53                 try:
54                         for key in settings.keys():
55                                 if key in ["IPv4", "IPv6"]:
56                                         val = extract_values(settings[key])
57                                 elif key in  ["AllowedBearers"]:
58                                         val = extract_list(settings[key])
59                                 else:
60                                         val = settings[key]
61                                 print "    %s = %s" % (key, val)
62                 except:
63                         print "Exception:"
64                         traceback.print_exc()
65
66 class SessionApplication(dbus.service.Object):
67         def __init__(self, bus, object_path, mainloop):
68                 dbus.service.Object.__init__(self, bus, object_path)
69
70                 self.manager = None
71                 self.mainloop = mainloop
72                 self.sessions = {}
73
74                 try:
75                         bus = dbus.SystemBus()
76                         bus.watch_name_owner('net.connman', self.connman_name_owner_changed)
77                 except dbus.DBusException:
78                         traceback.print_exc()
79                         exit(1)
80
81         def connman_name_owner_changed(self, proxy):
82                 try:
83                         if proxy:
84                                 print "connman appeared on D-Bus ", str(proxy)
85
86                                 bus = dbus.SystemBus()
87                                 self.manager = dbus.Interface(bus.get_object("net.connman", "/"),
88                                                               "net.connman.Manager")
89                         else:
90                                 print "connman disappeared on D-Bus"
91                                 self.manager = None
92                                 for s in self.sessions.keys():
93                                         self.sessions[s]['notify'].remove_from_connection()
94                                         self.sessions[s]['notify'] = None
95
96                                 self.sessions = {}
97
98                 except dbus.DBusException:
99                         traceback.print_exc()
100                         exit(1)
101
102         def release(self, session_name):
103                 s = self.find_session(session_name)
104                 if not s:
105                         return
106                 if s['session']:
107                         s['session'].Destroy()
108                         s['session'] = None
109                 if s['notify']:
110                         s['notify'].remove_from_connection()
111                         s['notify'] = None
112                 del self.sessions[session_name]
113
114         def type_convert(self, key, value):
115                 if key in [ "AllowedBearers" ]:
116                         return value
117                 elif key in [ "RoamingPolicy", "ConnectionType" ]:
118                         if len(value) > 0:
119                                 return value[0]
120                 elif key in [ "Priority", "AvoidHandover",
121                               "StayConnected", "EmergencyCall" ]:
122                         flag = str(value[0]).strip().lower()
123                         val = flag not in ['false', 'f', 'n', '0']
124                         return dbus.Boolean(val)
125                 elif key in [ "PeriodicConnect", "IdleTimeout" ]:
126                         val = value[0]
127                         return dbus.UInt32(val)
128
129                 return value
130
131         def find_session(self, session_name):
132                 if not session_name in self.sessions.keys():
133                         return None
134                 return self.sessions[session_name]
135
136         @dbus.service.method("com.example.TestSession",
137                                 in_signature='', out_signature='')
138         def CreateSession(self, session_name):
139                 print "Create session"
140
141                 s = self.find_session(session_name)
142                 if s and s['session'] :
143                         print "Session %s already created-> drop reqest" % (session_name)
144                         return
145
146                 try:
147                         bus = dbus.SystemBus()
148
149                         if s == None:
150                                 s = {}
151                         s['notify_path'] = self._object_path + "/" + session_name
152                         s['notify'] = Notification(bus, self, s['notify_path'])
153                         s['notify'].add_to_connection(bus, s['notify_path'])
154                         if not 'settings' in s.keys():
155                                 s['settings'] = {};
156                         s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path'])
157                         print "notify path %s" % (s['notify_path'])
158                         print "session path %s" % (s['session_path'])
159                         s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']),
160                                                       "net.connman.Session")
161                         self.sessions[session_name] = s
162
163                 except dbus.DBusException, e:
164                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
165                                 print e.get_dbus_message()
166                                 return
167                         traceback.print_exc()
168                         exit(1)
169
170         @dbus.service.method("com.example.TestSession",
171                                 in_signature='', out_signature='')
172         def DestroySession(self, session_name):
173                 print "Destroy session"
174
175                 s = self.find_session(session_name)
176                 if s == None or s['session'] == None:
177                         print "The session is not running -> drop request"
178                         return
179
180                 try:
181                         self.release(session_name)
182                 except dbus.DBusException:
183                         traceback.print_exc()
184                         exit(1)
185
186         @dbus.service.method("com.example.TestSession",
187                                 in_signature='', out_signature='')
188         def Connect(self, session_name):
189                 print "Connect session"
190
191                 s = self.find_session(session_name)
192                 if s == None or s['session'] == None:
193                         print "The session is not running -> drop request"
194                         return
195
196                 try:
197                         s['session'].Connect()
198                 except dbus.DBusException, e:
199                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
200                                 print e.get_dbus_message()
201                                 return
202                         traceback.print_exc()
203                         exit(1)
204
205         @dbus.service.method("com.example.TestSession",
206                                 in_signature='', out_signature='')
207         def Disconnect(self, session_name):
208                 print "Disconnect session"
209
210                 s = self.find_session(session_name)
211                 if s == None or s['session'] == None:
212                         print "The session is not running -> drop request"
213                         return
214
215                 try:
216                         s['session'].Disconnect()
217                 except dbus.DBusException, e:
218                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
219                                 print e.get_dbus_message()
220                                 return
221                         traceback.print_exc()
222                         exit(1)
223
224         @dbus.service.method("com.example.TestSession",
225                                 in_signature='', out_signature='')
226         def Change(self, session_name, key, value):
227                 print "Update session settings"
228
229                 s = self.find_session(session_name)
230                 if s == None or s['session'] == None:
231                         print "The session is not running -> drop request"
232                         return
233
234                 try:
235                         val = self.type_convert(key, value)
236                         s['session'].Change(key, val)
237                 except dbus.DBusException, e:
238                         if e.get_dbus_name() in ['net.connman.Error.Failed']:
239                                 print e.get_dbus_message()
240                                 return
241                         traceback.print_exc()
242                         exit(1)
243
244         @dbus.service.method("com.example.TestSession",
245                                 in_signature='', out_signature='')
246         def Configure(self, session_name, key, value):
247                 print "Configure session settings"
248                 s = self.find_session(session_name)
249                 if s == None:
250                         s = {}
251                         s['notify_path'] = None
252                         s['notify'] = None
253                         if not 'settings' in s.keys():
254                                 s['settings'] = {};
255                         s['session_path'] = None
256                         s['session'] = None
257                         self.sessions[session_name] = s
258                 if s and s['session']:
259                         print "The session is running, use change -> drop request"
260                         return
261                 val = self.type_convert(key, value)
262                 s['settings'][key] = val
263
264 def main():
265         if len(sys.argv) < 2:
266                 print "Usage: %s <command>" % (sys.argv[0])
267                 print ""
268                 print "  enable"
269                 print "  disable"
270                 print "  create <app_path> <session_name>"
271                 print "  destroy <app_path> <session_name>"
272                 print "  connect <app_path> <session_name>"
273                 print "  disconnect <app_path> <session_name>"
274                 print "  change <app_path> <session_name> <key> <value>"
275                 print "  configure <app_path> <session_name> <key> <value>"
276                 print ""
277                 print "  run <app_path>"
278                 sys.exit(1)
279
280         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
281
282         if sys.argv[1] == "enable":
283                 bus = dbus.SystemBus()
284                 manager = dbus.Interface(bus.get_object("net.connman", "/"),
285                                          "net.connman.Manager")
286                 manager.SetProperty("SessionMode", True)
287                 return
288
289         elif sys.argv[1] == "disable":
290                 bus = dbus.SystemBus()
291                 manager = dbus.Interface(bus.get_object("net.connman", "/"),
292                                          "net.connman.Manager")
293                 manager.SetProperty("SessionMode", False)
294                 return
295
296         if (len(sys.argv) < 3):
297                 print "Need test application path"
298                 sys.exit(1)
299
300         app_path = sys.argv[2]
301         bus = dbus.SessionBus()
302
303         app_name = "com.example.SessionApplication.%s" % (string.strip(app_path, "/"))
304
305         if sys.argv[1] == "run":
306                 name = dbus.service.BusName(app_name, bus)
307                 mainloop = gobject.MainLoop()
308
309                 app = SessionApplication(bus, app_path, mainloop)
310
311                 mainloop.run()
312                 return
313
314         app = dbus.Interface(bus.get_object(app_name, app_path),
315                              "com.example.TestSession")
316
317         if sys.argv[1] == "create":
318                 app.CreateSession(sys.argv[3])
319
320         elif sys.argv[1] == "destroy":
321                 app.DestroySession(sys.argv[3])
322
323         elif sys.argv[1] == "connect":
324                 app.Connect(sys.argv[3])
325
326         elif sys.argv[1] == "disconnect":
327                 app.Disconnect(sys.argv[3])
328
329         elif sys.argv[1] == "change":
330                 if len(sys.argv) < 5:
331                         print "Arguments missing"
332                         sys.exit(1)
333
334                 app.Change(sys.argv[3], sys.argv[4], sys.argv[5:])
335
336         elif sys.argv[1] == "configure":
337                 if len(sys.argv) < 5:
338                         print "Arguments missing"
339                         sys.exit(1)
340
341                 app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:])
342
343         else:
344                 print "Unknown command '%s'" % sys.argv[1]
345                 sys.exit(1)
346
347 if __name__ == '__main__':
348         main()