Set security value appropriate in WPA3/WPA2 mode
[platform/upstream/connman.git] / test / p2p-on-supplicant
1 #!/usr/bin/python
2
3 from os import O_NONBLOCK
4 from sys import stdin, stdout, exit, version_info, argv
5 from fcntl import fcntl, F_GETFL, F_SETFL
6 from gi.repository import GLib
7 import dbus
8 import dbus.mainloop.glib
9 import argparse
10
11 WPA_NAME='fi.w1.wpa_supplicant1'
12 WPA_INTF='fi.w1.wpa_supplicant1'
13 WPA_PATH='/fi/w1/wpa_supplicant1'
14 WPA_IF_INTF = WPA_INTF + '.Interface'
15 WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
16 WPA_GROUP_INTF = WPA_INTF + '.Group'
17 WPA_PEER_INTF = WPA_INTF + '.Peer'
18 DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'
19
20 P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0
21
22 class ArgFields:
23     for field in ('help', 'metavar'):
24         exec('{}="{}"'.format(field, field))
25
26 class InputLine:
27     def __init__(self, handler):
28         self.line = ''
29         self.handler = handler
30
31         flags = fcntl(stdin.fileno(), F_GETFL)
32         flags |= O_NONBLOCK
33         fcntl(stdin.fileno(), F_SETFL, flags)
34         GLib.io_add_watch(stdin, GLib.IO_IN, self.input_cb)
35
36         self.prompt()
37
38     def prompt(self):
39         self.line = ''
40         print('> ', end=' ')
41         stdout.flush()
42
43     def input_cb(self, fd, event):
44         if event != GLib.IO_IN:
45             return
46
47         self.line += fd.read();
48         for line in self.line.split('\n'):
49             line = line.strip()
50             if len(line) == 0:
51                 break
52
53             self.handler(line.strip())
54
55         self.prompt()
56
57         return True
58
59 def error_print(ex):
60     print('Command Error: %s' % ex)
61
62 def checkarg(nb_args = 0, min_args = False):
63     def under(function):
64         def wrapper(*args, **kwargs):
65             resuls = True
66
67             if min_args:
68                 result = len(args[1]) < nb_args
69             else:
70                 result = len(args[1]) != nb_args
71
72             if result:
73                 raise Exception('Command %s takes %s arguments' %
74                                     (function.__name__, nb_args))
75             return function(*args, **kwargs)
76         return wrapper
77     return under
78
79 def print_dict(d):
80     for k in d:
81         try:
82             if type(d[k]) is dbus.Byte:
83                 print('Key %s --> 0x%x' % (k, d[k]))
84             else:
85                 print('Key %s --> %s' % (k, d[k]))
86         except:
87             print("Error: Key %s content cannot be printed" % k)
88             pass
89
90 def print_tuple(t):
91     for e in t:
92         if type(e) is dbus.Dictionary:
93             print_dict(e)
94         else:
95             print('Element: %s' % e)
96
97 class Wpa_s:
98     def __init__(self, bus, iface_name, command):
99         self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
100         bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
101                                 member_keyword='signal')
102         bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH,
103                                 signal_name='InterfaceAdded')
104         bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH,
105                                 signal_name='InterfaceRemoved')
106         self.__reset()
107
108         self.bus = bus
109
110         self.debug = False
111
112         self.line_in = InputLine(self.__command)
113
114         if iface_name:
115             try:
116                 self.create_if([iface_name])
117             except:
118                 print("Error creating interface: %s" % iface_name)
119
120         if len(command.strip(' ')):
121             self.__command(command)
122
123     def help(self, args):
124         list = list(self.command_list.keys())
125         list.sort()
126         for key in list:
127             help = ''
128             if (ArgFields.help in self.command_list[key]):
129                 help = self.command_list[key][ArgFields.help]
130
131             print("%s\t%s" % (key.rjust(25), help.ljust(50)))
132
133     def __command(self, cmd_line):
134         cmd = cmd_line.split(' ')
135
136         try:
137             func = getattr(self, cmd[0])
138         except Exception as e:
139             print('Error: command unknown - %s' % e)
140             return
141
142         try:
143             func(cmd[1:])
144         except Exception as e:
145             error_print(e)
146
147     def __wpa_property_changed(*args, **kwargs):
148         print('WPA - Signal:  %s' % kwargs.get('signal'))
149
150     def __if_property_changed(*args, **kwargs):
151         signal = kwargs.get('signal')
152         print('IF - Signal:  %s' % signal)
153
154         if signal == 'BSSAdded':
155             return
156
157         if args[0].debug:
158             print_tuple(args[1:])
159
160     def __p2p_property_changed(*args, **kwargs):
161         print('IF P2P - Signal:  %s' % kwargs.get('signal'))
162         if args[0].debug:
163             print_tuple(args[1:])
164
165     def __peer_if_p2p_property_changed(*args, **kwargs):
166         print('Peer - ', end=' ')
167         args[0].__p2p_property_changed(*args, **kwargs)
168
169     def __DeviceFound(self, object_path):
170         self.peers[object_path] = None
171
172         peer = self.bus.get_object(WPA_INTF, object_path)
173         peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
174
175         self.bus.add_signal_receiver(self.__peer_if_p2p_property_changed,
176                                      dbus_interface=WPA_PEER_INTF,
177                                      path=object_path, member_keyword='signal')
178
179         self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF)
180
181         if self.debug:
182             print_dict(self.peers[object_path])
183
184     def __DeviceLost(self, object_path):
185         if object_path in self.peers:
186             del self.peers[object_path]
187
188     def __PeerJoined(self, object_path):
189         print('Peer %s joined' % object_path)
190
191     def __PeerDisconnected(self, object_path):
192         print('Peer %s disconnected' % object_path)
193
194     def __group_if_property_changed(*args, **kwargs):
195         print('Group - ', end=' ')
196         args[0].__if_property_changed(*args, **kwargs)
197
198     def __group_if_p2p_property_changed(*args, **kwargs):
199         print('Group - ', end=' ')
200         args[0].__p2p_property_changed(*args, **kwargs)
201
202     def __GroupFinished(self, properties):
203         print('Group running on %s is being removed' % ifname)
204         self.group_obj = self.group_if = self.group_iface_path = None
205
206         if self.debug:
207             print_dict(properties)
208
209     def __InvitationResult(self, response):
210         print('Invitation result status: %d ' % response['status'])
211
212         if 'bssid' in response:
213             print('bssid: %s' % response['bssid'])
214
215         if self.debug:
216             print_dict(response)
217
218     def __GroupStarted(self, properties):
219         self.group_obj = properties['group_object']
220         self.bus.add_signal_receiver(self.__PeerJoined,
221                                 dbus_interface=WPA_GROUP_INTF,
222                                 path=self.group_obj,
223                                 signal_name='PeerJoined')
224         self.bus.add_signal_receiver(self.__PeerDisconnected,
225                                 dbus_interface=WPA_GROUP_INTF,
226                                 path=self.group_obj,
227                                 signal_name='PeerDisconnected')
228
229         self.group_iface_path = properties['interface_object']
230         self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF,
231                                        self.group_iface_path),
232                                        WPA_P2P_INTF)
233         self.bus.add_signal_receiver(self.__group_if_property_changed,
234                                 dbus_interface=WPA_IF_INTF,
235                                 path=self.group_iface_path,
236                                 member_keyword='signal')
237         self.bus.add_signal_receiver(self.__group_if_p2p_property_changed,
238                                 dbus_interface=WPA_P2P_INTF,
239                                 path=self.group_iface_path,
240                                 member_keyword='signal')
241         self.bus.add_signal_receiver(self.__GroupFinished,
242                                 dbus_interface=WPA_P2P_INTF,
243                                 path=self.group_iface_path,
244                                 signal_name='GroupFinished')
245         self.bus.add_signal_receiver(self.__InvitationResult,
246                                 dbus_interface=WPA_P2P_INTF,
247                                 path=self.iface_path,
248                                 signal_name='InvitationResult')
249
250         if self.debug:
251             group = dbus.Interface(self.bus.get_object(WPA_INTF,
252                                                        self.group_obj),
253                                                        DBUS_PROPERTIES_INTF)
254             print_dict(group.GetAll(WPA_GROUP_INTF))
255
256     def __ServiceDiscoveryResponse(self, response):
257         peer = response['peer_object']
258         if peer in self.peers:
259             print('Peer %s has this TLVs:' % (self.peers[peer]['DeviceName']))
260             print(response['tlvs'])
261
262     def __InterfaceAdded(self, path, properties):
263         print('Interface %s Added (%s)' % (properties['Ifname'], path))
264         if self.debug:
265             print_dict(properties)
266             p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
267                                  path), DBUS_PROPERTIES_INTF)
268             print_dict(p2p.GetAll(WPA_P2P_INTF))
269
270     def __InterfaceRemoved(self, path):
271         print('Interface Removed (%s)' % (path))
272
273     def __listen_if_signals(self):
274         self.bus.add_signal_receiver(self.__if_property_changed,
275                                 dbus_interface=WPA_IF_INTF,
276                                 path=self.iface_path,
277                                 member_keyword='signal')
278         self.bus.add_signal_receiver(self.__p2p_property_changed,
279                                 dbus_interface=WPA_P2P_INTF,
280                                 path=self.iface_path,
281                                 member_keyword='signal')
282         self.bus.add_signal_receiver(self.__GroupStarted,
283                                 dbus_interface=WPA_P2P_INTF,
284                                 path=self.iface_path,
285                                 signal_name='GroupStarted')
286         self.bus.add_signal_receiver(self.__DeviceFound,
287                                 dbus_interface=WPA_P2P_INTF,
288                                 path=self.iface_path,
289                                 signal_name='DeviceFound')
290         self.bus.add_signal_receiver(self.__DeviceLost,
291                                 dbus_interface=WPA_P2P_INTF,
292                                 path=self.iface_path,
293                                 signal_name='DeviceLost')
294         self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse,
295                                 dbus_interface=WPA_P2P_INTF,
296                                 path=self.iface_path,
297                                 signal_name='ServiceDiscoveryResponse')
298
299     def __reset(self):
300         self.iface_path = self.iface_name = self.iface = None
301         self.p2p = self.group_if = self.group_obj = None
302         self.peers = {}
303
304     def __set_if(self, iface_name):
305         self.iface = dbus.Interface(self.bus.get_object(WPA_INTF,
306                                     self.iface_path), WPA_IF_INTF)
307         self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
308                                     self.iface_path), WPA_P2P_INTF)
309
310         p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF)
311         p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig',
312                    dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' },
313                                    signature='sv'))
314         print('Interface %s: %s' % (iface_name, self.iface_path))
315         self.iface_name = iface_name
316         self.__listen_if_signals()
317
318     @checkarg()
319     def enable_debug(self, args):
320         self.debug = True
321
322     @checkarg()
323     def disable_debug(self, args):
324         self.debug = False
325
326     @checkarg(nb_args=1)
327     def create_if(self, args):
328         self.__reset()
329         self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} ))
330         self.__set_if(args[0])
331
332     @checkarg(nb_args=1)
333     def get_if(self, args):
334         self.__reset()
335         self.iface_path = self.wpa.GetInterface(args[0])
336         self.__set_if(args[0])
337
338     @checkarg()
339     def del_if(self, args = None):
340         if not self.iface_path:
341             return
342
343         self.wpa.RemoveInterface(self.iface_path)
344         print('Interface %s removed' % self.iface_name)
345         self.__reset()
346
347     @checkarg()
348     def scan(self, args = None):
349         if not self.iface:
350             return
351
352         self.iface.Scan(({ 'Type': 'passive' }))
353         print('Scan started')
354
355     @checkarg()
356     def quit(self, args = None):
357         self.del_if(args)
358         exit(0)
359
360     @checkarg(nb_args=1)
361     def set_command_list(self, command_list):
362         self.command_list = command_list[0]
363
364     @checkarg()
365     def p2p_find(self, args = None):
366         if not self.p2p:
367             return
368
369         self.p2p.Find(({}))
370
371     @checkarg()
372     def p2p_stop_find(self, args = None):
373         if not self.p2p:
374             return
375
376         self.p2p.StopFind()
377
378     @checkarg()
379     def p2p_peers(self, args = None):
380         if not self.iface:
381             return
382
383         for p in self.peers:
384             print('Peer Name=%s' % (self.peers[p]['DeviceName']))
385
386     def __find_peer(self, peer_name, ret_object_path = False):
387         if len(self.peers) == 0:
388             return None
389
390         peer = None
391         for p in self.peers:
392             if self.peers[p]['DeviceName'] == peer_name:
393                 peer = self.peers[p]
394                 break
395
396         if not peer:
397             print('No peer found under the name: %s' % peer_name)
398             p = None
399
400         if ret_object_path:
401             return p
402         else:
403             return peer
404
405     @checkarg(nb_args = 1)
406     def p2p_peer(self, args):
407         peer_path = self.__find_peer(args[0], True)
408         if peer_path:
409             peer = self.bus.get_object(WPA_INTF, peer_path)
410             peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
411             self.peers[peer_path] = peer_if.GetAll(WPA_PEER_INTF)
412
413             print_dict(self.peers[peer_path])
414
415     @checkarg(nb_args = 1)
416     def p2p_connect(self, args):
417         if not self.p2p:
418             return
419
420         peer = self.__find_peer(args[0])
421         if not peer:
422             return
423
424         peer_path = self.__find_peer(args[0], True)
425
426         if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER ==
427                                             P2P_GROUP_CAPAB_GROUP_OWNER):
428             print('Joining an existing P2P group')
429             pin = self.p2p.Connect(({ 'peer' : peer_path,
430                                       'wps_method' : 'pbc',
431                                       'join' : True,
432                                       'go_intent' : 0 }))
433         else:
434             print('Associating with another P2P device')
435             pin = self.p2p.Connect(({ 'peer' : peer_path,
436                                       'wps_method' : 'pbc',
437                                       'join' : False,
438                                       'go_intent' : 7 }))
439             if not pin:
440                 print('WPS PIN in use: %s' % pin)
441
442     @checkarg(nb_args = 1)
443     def p2p_disconnect(self, args):
444         if not self.p2p:
445             return
446
447         peer = self.__find_peer(args[0])
448         if not peer:
449             return
450
451         if not self.group_if:
452             print('Peer %s is not connected' % (peer['DeviceName']))
453             return
454
455         self.group_if.Disconnect()
456
457     @checkarg()
458     def p2p_group_add(self, args):
459         if not self.p2p:
460             return
461
462         self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) }))
463
464     @checkarg()
465     def p2p_group_remove(self, args):
466         if not self.group_if:
467             return
468
469         self.group_if.Disconnect()
470
471     @checkarg()
472     def p2p_group(self, args):
473         if not self.group_obj:
474             return
475
476         group = dbus.Interface(self.bus.get_object(WPA_INTF,
477                                self.group_obj), DBUS_PROPERTIES_INTF)
478         print_dict(group.GetAll(WPA_GROUP_INTF))
479
480     @checkarg()
481     def p2p_flush(self, args):
482         if not self.p2p:
483             return
484
485         self.p2p.Flush()
486
487     @checkarg()
488     def p2p_serv_disc_req(self, args = None):
489         if not self.p2p:
490             return
491
492         """ We request all kind of services """
493         sd_req = dbus.Array(signature='y', variant_level=1)
494         for a in [2,0,0,1]:
495             sd_req.append(dbus.Byte(a))
496
497         ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req }))
498         print('Service discovery reference: %s' % ref)
499
500     @checkarg(nb_args = 1)
501     def p2p_serv_disc_cancel_req(self, args):
502         if not self.p2p:
503             return
504
505         self.p2p.ServiceDiscoveryCancelRequest(int(args[0]))
506
507     @checkarg(nb_args = 3)
508     def p2p_service_add(self, args):
509         if not self.p2p:
510             return
511
512         service = { 'service_type' : args[0] }
513         if args[0] == 'upnp':
514             service['version'] = args[1]
515             service['service'] = args[2]
516         elif args[0] == 'bonjour':
517             service['query'] = args[1]
518             service['response'] = args[2]
519         else:
520             print('Unknown service: %s' % args[0])
521             return
522
523         self.p2p.AddService((service))
524
525     @checkarg(nb_args = 2, min_args = True)
526     def p2p_service_del(self, args):
527         if not self.p2p:
528             return
529
530         service = { 'service_type' : args[0] }
531         if args[0] == 'upnp':
532             service['version'] = args[1]
533             service['service'] = args[2]
534         elif args[0] == 'bonjour':
535             service['query'] = args[1]
536         else:
537             print('Unknown service: %s' % args[0])
538             return
539
540         self.p2p.DeleteService((service))
541
542     @checkarg()
543     def p2p_service_flush(self, args = None):
544         if not self.p2p:
545             return
546
547         self.p2p.FlushService()
548
549     @checkarg(nb_args = 1)
550     def p2p_invite(self, args):
551         if not self.p2p or not self.group_if:
552             return
553
554         peer_path = self.__find_peer(args[0], True)
555
556         if not peer_path:
557             return
558
559         self.group_if.Invite({ 'peer' : peer_path})
560
561 def build_args(parser):
562     parser.add_argument('-d', default=False, action='store_true',
563                        dest='debug', help='enable debug')
564     parser.add_argument('-i', metavar='<interface>', dest='ifname',
565                         help='interface name')
566
567     command = {}
568     command['quit'] = {}
569     command['enable_debug'] = {}
570     command['disable_debug'] = {}
571     command['create_if'] = {ArgFields.help:'<iface_name> - create interface'}
572     command['get_if'] = {ArgFields.help:'<iface_name> - get interface'}
573     command['del_if'] = {ArgFields.help:'removes current interface'}
574     command['scan'] = {}
575     command['p2p_find'] = {}
576     command['p2p_stop_find'] = {}
577     command['p2p_flush'] = {}
578     command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'}
579     command['p2p_group_remove'] = {}
580     command['p2p_group'] = {}
581     command['p2p_peers'] = {}
582     command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a '
583                                 'peer'}
584     command['p2p_connect'] = {ArgFields.help:'<p2p device name>'}
585     command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'}
586     command['p2p_serv_disc_req'] = {}
587     command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'}
588     command['p2p_service_add'] = {ArgFields.help:'<service type> '
589                                   '<version/query> <service/response>'}
590     command['p2p_service_del'] = {ArgFields.help:'<service type> '
591                                   '<version/query> [<service>]'}
592     command['p2p_service_flush'] = {}
593     command['p2p_invite'] = {ArgFields.help:'<p2p device name>'}
594
595     command_list = list(command.keys())
596     command_list.sort()
597     subparsers = parser.add_subparsers(help='commands', dest='command')
598     subparsers.add_parser('')
599     for key in command_list:
600         help=None
601         metavar=None
602         if ArgFields.help in command[key]:
603             help = command[key][ArgFields.help]
604         if ArgFields.metavar in command[key]:
605             metavar = command[key][ArgFields.metavar]
606         command_parser = subparsers.add_parser(key, help=help)
607         command_parser.add_argument(key, nargs='*', metavar=metavar, help=help)
608
609     return command
610
611 def main():
612     parser = argparse.ArgumentParser(description='Connman P2P Test')
613
614     command_list = build_args(parser)
615
616     argv[1:] += ['']
617
618     args = parser.parse_args(argv[1:])
619
620     dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
621
622     opts = []
623     if args.command:
624         opts = getattr(args, args.command)
625
626     params = ''
627     if opts and len(opts[0]):
628         params = ' ' + ''.join(opts)
629
630     bus = dbus.SystemBus()
631
632     mainloop = GLib.MainLoop()
633
634     wpa_s = Wpa_s(bus, args.ifname, args.command + params)
635
636     if (args.debug):
637             wpa_s.enable_debug([])
638
639     wpa_s.set_command_list([command_list])
640
641     mainloop.run()
642
643 if __name__ == '__main__':
644     main()