8cc76e818379ea51e6320b6ff25077261271437e
[platform/upstream/connman.git] / test / p2p-on-supplicant
1 #!/usr/bin/python
2
3 from os import O_NONBLOCK
4 from sys import stdin, stdout, exit, version_info, argv
5 from fcntl import fcntl, F_GETFL, F_SETFL
6 import glib
7 import dbus
8 import dbus.mainloop.glib
9 import gobject
10 import argparse
11
12 WPA_NAME='fi.w1.wpa_supplicant1'
13 WPA_INTF='fi.w1.wpa_supplicant1'
14 WPA_PATH='/fi/w1/wpa_supplicant1'
15 WPA_IF_INTF = WPA_INTF + '.Interface'
16 WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
17 WPA_GROUP_INTF = WPA_INTF + '.Group'
18 WPA_PEER_INTF = WPA_INTF + '.Peer'
19 DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'
20
21 P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0
22
23 class ArgFields:
24     for field in ('help', 'metavar'):
25         exec('{}="{}"'.format(field, field))
26
27 class InputLine:
28     def __init__(self, handler):
29         self.line = ''
30         self.handler = handler
31
32         flags = fcntl(stdin.fileno(), F_GETFL)
33         flags |= O_NONBLOCK
34         fcntl(stdin.fileno(), F_SETFL, flags)
35         glib.io_add_watch(stdin, glib.IO_IN, self.input_cb)
36
37         self.prompt()
38
39     def prompt(self):
40         self.line = ''
41         print '> ',
42         stdout.flush()
43
44     def input_cb(self, fd, event):
45         if event != glib.IO_IN:
46             return
47
48         self.line += fd.read();
49         for line in self.line.split('\n'):
50             line = line.strip()
51             if len(line) == 0:
52                 break
53
54             self.handler(line.strip())
55
56         self.prompt()
57
58         return True
59
60 def error_print(ex):
61     print 'Command Error: %s' % ex
62
63 def checkarg(nb_args = 0, min_args = False):
64     def under(function):
65         def wrapper(*args, **kwargs):
66             resuls = True
67
68             if min_args:
69                 result = len(args[1]) < nb_args
70             else:
71                 result = len(args[1]) != nb_args
72
73             if result:
74                 raise Exception('Command %s takes %s arguments' %
75                                     (function.__name__, nb_args))
76             return function(*args, **kwargs)
77         return wrapper
78     return under
79
80 def print_dict(d):
81     for k in d:
82         try:
83             if type(d[k]) is dbus.Byte:
84                 print 'Key %s --> 0x%x' % (k, d[k])
85             else:
86                 print 'Key %s --> %s' % (k, d[k])
87         except:
88             print "Error: Key %s content cannot be printed" % k
89             pass
90
91 def print_tuple(t):
92     for e in t:
93         if type(e) is dbus.Dictionary:
94             print_dict(e)
95         else:
96             print 'Element: %s' % e
97
98 class Wpa_s:
99     def __init__(self, bus, iface_name, command):
100         self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
101         bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
102                                 member_keyword='signal')
103         bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH,
104                                 signal_name='InterfaceAdded')
105         bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH,
106                                 signal_name='InterfaceRemoved')
107         self.__reset()
108
109         self.bus = bus
110
111         self.debug = False
112
113         self.line_in = InputLine(self.__command)
114
115         if iface_name:
116             try:
117                 self.create_if([iface_name])
118             except:
119                 print "Error creating interface: %s" % iface_name
120
121         if len(command.strip(' ')):
122             self.__command(command)
123
124     def help(self, args):
125         list = self.command_list.keys()
126         list.sort()
127         for key in list:
128             help = ''
129             if (self.command_list[key].has_key(ArgFields.help)):
130                 help = self.command_list[key][ArgFields.help]
131
132             print "%s\t%s" % (key.rjust(25), help.ljust(50))
133
134     def __command(self, cmd_line):
135         cmd = cmd_line.split(' ')
136
137         try:
138             func = getattr(self, cmd[0])
139         except Exception, e:
140             print 'Error: command unknown - %s' % e
141             return
142
143         try:
144             func(cmd[1:])
145         except Exception, e:
146             error_print(e)
147
148     def __wpa_property_changed(*args, **kwargs):
149         print 'WPA - Signal:  %s' % kwargs.get('signal')
150
151     def __if_property_changed(*args, **kwargs):
152         signal = kwargs.get('signal')
153         print 'IF - Signal:  %s' % signal
154
155         if signal == 'BSSAdded':
156             return
157
158         if args[0].debug:
159             print_tuple(args[1:])
160
161     def __p2p_property_changed(*args, **kwargs):
162         print 'IF P2P - Signal:  %s' % kwargs.get('signal')
163         if args[0].debug:
164             print_tuple(args[1:])
165
166     def __peer_if_p2p_property_changed(*args, **kwargs):
167         print 'Peer - ',
168         args[0].__p2p_property_changed(*args, **kwargs)
169
170     def __DeviceFound(self, object_path):
171         self.peers[object_path] = None
172
173         peer = self.bus.get_object(WPA_INTF, object_path)
174         peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
175
176         self.bus.add_signal_receiver(self.__peer_if_p2p_property_changed,
177                                      dbus_interface=WPA_PEER_INTF,
178                                      path=object_path, member_keyword='signal')
179
180         self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF)
181
182         if self.debug:
183             print_dict(self.peers[object_path])
184
185     def __DeviceLost(self, object_path):
186         if object_path in self.peers:
187             del self.peers[object_path]
188
189     def __PeerJoined(self, object_path):
190         print 'Peer %s joined' % object_path
191
192     def __PeerDisconnected(self, object_path):
193         print 'Peer %s disconnected' % object_path
194
195     def __group_if_property_changed(*args, **kwargs):
196         print 'Group - ',
197         args[0].__if_property_changed(*args, **kwargs)
198
199     def __group_if_p2p_property_changed(*args, **kwargs):
200         print 'Group - ',
201         args[0].__p2p_property_changed(*args, **kwargs)
202
203     def __GroupFinished(self, properties):
204         print 'Group running on %s is being removed' % ifname
205         self.group_obj = self.group_if = self.group_iface_path = None
206
207         if self.debug:
208             print_dict(properties)
209
210     def __InvitationResult(self, response):
211         print 'Invitation result status: %d ' % response['status']
212
213         if response.has_key('bssid'):
214             print 'bssid: %s' % response['bssid']
215
216         if self.debug:
217             print_dict(response)
218
219     def __GroupStarted(self, properties):
220         self.group_obj = properties['group_object']
221         self.bus.add_signal_receiver(self.__PeerJoined,
222                                 dbus_interface=WPA_GROUP_INTF,
223                                 path=self.group_obj,
224                                 signal_name='PeerJoined')
225         self.bus.add_signal_receiver(self.__PeerDisconnected,
226                                 dbus_interface=WPA_GROUP_INTF,
227                                 path=self.group_obj,
228                                 signal_name='PeerDisconnected')
229
230         self.group_iface_path = properties['interface_object']
231         self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF,
232                                        self.group_iface_path),
233                                        WPA_P2P_INTF)
234         self.bus.add_signal_receiver(self.__group_if_property_changed,
235                                 dbus_interface=WPA_IF_INTF,
236                                 path=self.group_iface_path,
237                                 member_keyword='signal')
238         self.bus.add_signal_receiver(self.__group_if_p2p_property_changed,
239                                 dbus_interface=WPA_P2P_INTF,
240                                 path=self.group_iface_path,
241                                 member_keyword='signal')
242         self.bus.add_signal_receiver(self.__GroupFinished,
243                                 dbus_interface=WPA_P2P_INTF,
244                                 path=self.group_iface_path,
245                                 signal_name='GroupFinished')
246         self.bus.add_signal_receiver(self.__InvitationResult,
247                                 dbus_interface=WPA_P2P_INTF,
248                                 path=self.iface_path,
249                                 signal_name='InvitationResult')
250
251         if self.debug:
252             group = dbus.Interface(self.bus.get_object(WPA_INTF,
253                                                        self.group_obj),
254                                                        DBUS_PROPERTIES_INTF)
255             print_dict(group.GetAll(WPA_GROUP_INTF))
256
257     def __ServiceDiscoveryResponse(self, response):
258         peer = response['peer_object']
259         if peer in self.peers:
260             print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName'])
261             print response['tlvs']
262
263     def __InterfaceAdded(self, path, properties):
264         print 'Interface %s Added (%s)' % (properties['Ifname'], path)
265         if self.debug:
266             print_dict(properties)
267             p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
268                                  path), DBUS_PROPERTIES_INTF)
269             print_dict(p2p.GetAll(WPA_P2P_INTF))
270
271     def __InterfaceRemoved(self, path):
272         print 'Interface Removed (%s)' % (path)
273
274     def __listen_if_signals(self):
275         self.bus.add_signal_receiver(self.__if_property_changed,
276                                 dbus_interface=WPA_IF_INTF,
277                                 path=self.iface_path,
278                                 member_keyword='signal')
279         self.bus.add_signal_receiver(self.__p2p_property_changed,
280                                 dbus_interface=WPA_P2P_INTF,
281                                 path=self.iface_path,
282                                 member_keyword='signal')
283         self.bus.add_signal_receiver(self.__GroupStarted,
284                                 dbus_interface=WPA_P2P_INTF,
285                                 path=self.iface_path,
286                                 signal_name='GroupStarted')
287         self.bus.add_signal_receiver(self.__DeviceFound,
288                                 dbus_interface=WPA_P2P_INTF,
289                                 path=self.iface_path,
290                                 signal_name='DeviceFound')
291         self.bus.add_signal_receiver(self.__DeviceLost,
292                                 dbus_interface=WPA_P2P_INTF,
293                                 path=self.iface_path,
294                                 signal_name='DeviceLost')
295         self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse,
296                                 dbus_interface=WPA_P2P_INTF,
297                                 path=self.iface_path,
298                                 signal_name='ServiceDiscoveryResponse')
299
300     def __reset(self):
301         self.iface_path = self.iface_name = self.iface = None
302         self.p2p = self.group_if = self.group_obj = None
303         self.peers = {}
304
305     def __set_if(self, iface_name):
306         self.iface = dbus.Interface(self.bus.get_object(WPA_INTF,
307                                     self.iface_path), WPA_IF_INTF)
308         self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
309                                     self.iface_path), WPA_P2P_INTF)
310
311         p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF)
312         p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig',
313                    dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' },
314                                    signature='sv'))
315         print 'Interface %s: %s' % (iface_name, self.iface_path)
316         self.iface_name = iface_name
317         self.__listen_if_signals()
318
319     @checkarg()
320     def enable_debug(self, args):
321         self.debug = True
322
323     @checkarg()
324     def disable_debug(self, args):
325         self.debug = False
326
327     @checkarg(nb_args=1)
328     def create_if(self, args):
329         self.__reset()
330         self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} ))
331         self.__set_if(args[0])
332
333     @checkarg(nb_args=1)
334     def get_if(self, args):
335         self.__reset()
336         self.iface_path = self.wpa.GetInterface(args[0])
337         self.__set_if(args[0])
338
339     @checkarg()
340     def del_if(self, args = None):
341         if not self.iface_path:
342             return
343
344         self.wpa.RemoveInterface(self.iface_path)
345         print 'Interface %s removed' % self.iface_name
346         self.__reset()
347
348     @checkarg()
349     def scan(self, args = None):
350         if not self.iface:
351             return
352
353         self.iface.Scan(({ 'Type': 'passive' }))
354         print 'Scan started'
355
356     @checkarg()
357     def quit(self, args = None):
358         self.del_if(args)
359         exit(0)
360
361     @checkarg(nb_args=1)
362     def set_command_list(self, command_list):
363         self.command_list = command_list[0]
364
365     @checkarg()
366     def p2p_find(self, args = None):
367         if not self.p2p:
368             return
369
370         self.p2p.Find(({}))
371
372     @checkarg()
373     def p2p_stop_find(self, args = None):
374         if not self.p2p:
375             return
376
377         self.p2p.StopFind()
378
379     @checkarg()
380     def p2p_peers(self, args = None):
381         if not self.iface:
382             return
383
384         for p in self.peers:
385             print 'Peer Name=%s' % (self.peers[p]['DeviceName'])
386
387     def __find_peer(self, peer_name, ret_object_path = False):
388         if len(self.peers) == 0:
389             return None
390
391         peer = None
392         for p in self.peers:
393             if self.peers[p]['DeviceName'] == peer_name:
394                 peer = self.peers[p]
395                 break
396
397         if not peer:
398             print 'No peer found under the name: %s' % peer_name
399             p = None
400
401         if ret_object_path:
402             return p
403         else:
404             return peer
405
406     @checkarg(nb_args = 1)
407     def p2p_peer(self, args):
408         peer_path = self.__find_peer(args[0], True)
409         if peer_path:
410             peer = self.bus.get_object(WPA_INTF, peer_path)
411             peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
412             self.peers[peer_path] = peer_if.GetAll(WPA_PEER_INTF)
413
414             print_dict(self.peers[peer_path])
415
416     @checkarg(nb_args = 1)
417     def p2p_connect(self, args):
418         if not self.p2p:
419             return
420
421         peer = self.__find_peer(args[0])
422         if not peer:
423             return
424
425         peer_path = self.__find_peer(args[0], True)
426
427         if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER ==
428                                             P2P_GROUP_CAPAB_GROUP_OWNER):
429             print 'Joining an existing P2P group'
430             pin = self.p2p.Connect(({ 'peer' : peer_path,
431                                       'wps_method' : 'pbc',
432                                       'join' : True,
433                                       'go_intent' : 0 }))
434         else:
435             print 'Associating with another P2P device'
436             pin = self.p2p.Connect(({ 'peer' : peer_path,
437                                       'wps_method' : 'pbc',
438                                       'join' : False,
439                                       'go_intent' : 7 }))
440             if not pin:
441                 print 'WPS PIN in use: %s' % pin
442
443     @checkarg(nb_args = 1)
444     def p2p_disconnect(self, args):
445         if not self.p2p:
446             return
447
448         peer = self.__find_peer(args[0])
449         if not peer:
450             return
451
452         if not self.group_if:
453             print 'Peer %s is not connected' % (peer['DeviceName'])
454             return
455
456         self.group_if.Disconnect()
457
458     @checkarg()
459     def p2p_group_add(self, args):
460         if not self.p2p:
461             return
462
463         self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) }))
464
465     @checkarg()
466     def p2p_group_remove(self, args):
467         if not self.group_if:
468             return
469
470         self.group_if.Disconnect()
471
472     @checkarg()
473     def p2p_group(self, args):
474         if not self.group_obj:
475             return
476
477         group = dbus.Interface(self.bus.get_object(WPA_INTF,
478                                self.group_obj), DBUS_PROPERTIES_INTF)
479         print_dict(group.GetAll(WPA_GROUP_INTF))
480
481     @checkarg()
482     def p2p_flush(self, args):
483         if not self.p2p:
484             return
485
486         self.p2p.Flush()
487
488     @checkarg()
489     def p2p_serv_disc_req(self, args = None):
490         if not self.p2p:
491             return
492
493         """ We request all kind of services """
494         sd_req = dbus.Array(signature='y', variant_level=1)
495         for a in [2,0,0,1]:
496             sd_req.append(dbus.Byte(a))
497
498         ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req }))
499         print 'Service discovery reference: %s' % ref
500
501     @checkarg(nb_args = 1)
502     def p2p_serv_disc_cancel_req(self, args):
503         if not self.p2p:
504             return
505
506         self.p2p.ServiceDiscoveryCancelRequest(int(args[0]))
507
508     @checkarg(nb_args = 3)
509     def p2p_service_add(self, args):
510         if not self.p2p:
511             return
512
513         service = { 'service_type' : args[0] }
514         if args[0] == 'upnp':
515             service['version'] = args[1]
516             service['service'] = args[2]
517         elif args[0] == 'bonjour':
518             service['query'] = args[1]
519             service['response'] = args[2]
520         else:
521             print 'Unknown service: %s' % args[0]
522             return
523
524         self.p2p.AddService((service))
525
526     @checkarg(nb_args = 2, min_args = True)
527     def p2p_service_del(self, args):
528         if not self.p2p:
529             return
530
531         service = { 'service_type' : args[0] }
532         if args[0] == 'upnp':
533             service['version'] = args[1]
534             service['service'] = args[2]
535         elif args[0] == 'bonjour':
536             service['query'] = args[1]
537         else:
538             print 'Unknown service: %s' % args[0]
539             return
540
541         self.p2p.DeleteService((service))
542
543     @checkarg()
544     def p2p_service_flush(self, args = None):
545         if not self.p2p:
546             return
547
548         self.p2p.FlushService()
549
550     @checkarg(nb_args = 1)
551     def p2p_invite(self, args):
552         if not self.p2p or not self.group_if:
553             return
554
555         peer_path = self.__find_peer(args[0], True)
556
557         if not peer_path:
558             return
559
560         self.group_if.Invite({ 'peer' : peer_path})
561
562 def build_args(parser):
563     parser.add_argument('-d', default=False, action='store_true',
564                        dest='debug', help='enable debug')
565     parser.add_argument('-i', metavar='<interface>', dest='ifname',
566                         help='interface name')
567
568     command = {}
569     command['quit'] = {}
570     command['enable_debug'] = {}
571     command['disable_debug'] = {}
572     command['create_if'] = {ArgFields.help:'<iface_name> - create interface'}
573     command['get_if'] = {ArgFields.help:'<iface_name> - get interface'}
574     command['del_if'] = {ArgFields.help:'removes current interface'}
575     command['scan'] = {}
576     command['p2p_find'] = {}
577     command['p2p_stop_find'] = {}
578     command['p2p_flush'] = {}
579     command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'}
580     command['p2p_group_remove'] = {}
581     command['p2p_group'] = {}
582     command['p2p_peers'] = {}
583     command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a '
584                                 'peer'}
585     command['p2p_connect'] = {ArgFields.help:'<p2p device name>'}
586     command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'}
587     command['p2p_serv_disc_req'] = {}
588     command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'}
589     command['p2p_service_add'] = {ArgFields.help:'<service type> '
590                                   '<version/query> <service/response>'}
591     command['p2p_service_del'] = {ArgFields.help:'<service type> '
592                                   '<version/query> [<service>]'}
593     command['p2p_service_flush'] = {}
594     command['p2p_invite'] = {ArgFields.help:'<p2p device name>'}
595
596     command_list = command.keys()
597     command_list.sort()
598     subparsers = parser.add_subparsers(help='commands', dest='command')
599     subparsers.add_parser('')
600     for key in command_list:
601         help=None
602         metavar=None
603         if command[key].has_key(ArgFields.help):
604             help = command[key][ArgFields.help]
605         if command[key].has_key(ArgFields.metavar):
606             metavar = command[key][ArgFields.metavar]
607         command_parser = subparsers.add_parser(key, help=help)
608         command_parser.add_argument(key, nargs='*', metavar=metavar, help=help)
609
610     return command
611
612 def main():
613     if version_info.major != 2:
614         print 'You need to run this under Python 2.x'
615         exit(1)
616
617     parser = argparse.ArgumentParser(description='Connman P2P Test')
618
619     command_list = build_args(parser)
620
621     argv[1:] += ['']
622
623     args = parser.parse_args(argv[1:])
624
625     dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
626
627     opts = []
628     if args.command:
629         opts = getattr(args, args.command)
630
631     params = ''
632     if opts and len(opts[0]):
633         params = ' ' + ''.join(opts)
634
635     bus = dbus.SystemBus()
636
637     mainloop = gobject.MainLoop()
638
639     wpa_s = Wpa_s(bus, args.ifname, args.command + params)
640
641     if (args.debug):
642             wpa_s.enable_debug([])
643
644     wpa_s.set_command_list([command_list])
645
646     mainloop.run()
647
648 if __name__ == '__main__':
649     main()