3 from os import O_NONBLOCK
4 from sys import stdin, stdout, exit, version_info, argv
5 from fcntl import fcntl, F_GETFL, F_SETFL
6 from gi.repository import GLib
8 import dbus.mainloop.glib
11 WPA_NAME='fi.w1.wpa_supplicant1'
12 WPA_INTF='fi.w1.wpa_supplicant1'
13 WPA_PATH='/fi/w1/wpa_supplicant1'
14 WPA_IF_INTF = WPA_INTF + '.Interface'
15 WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
16 WPA_GROUP_INTF = WPA_INTF + '.Group'
17 WPA_PEER_INTF = WPA_INTF + '.Peer'
18 DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'
20 P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0
23 for field in ('help', 'metavar'):
24 exec('{}="{}"'.format(field, field))
27 def __init__(self, handler):
29 self.handler = handler
31 flags = fcntl(stdin.fileno(), F_GETFL)
33 fcntl(stdin.fileno(), F_SETFL, flags)
34 GLib.io_add_watch(stdin, GLib.IO_IN, self.input_cb)
43 def input_cb(self, fd, event):
44 if event != GLib.IO_IN:
47 self.line += fd.read();
48 for line in self.line.split('\n'):
53 self.handler(line.strip())
60 print('Command Error: %s' % ex)
62 def checkarg(nb_args = 0, min_args = False):
64 def wrapper(*args, **kwargs):
68 result = len(args[1]) < nb_args
70 result = len(args[1]) != nb_args
73 raise Exception('Command %s takes %s arguments' %
74 (function.__name__, nb_args))
75 return function(*args, **kwargs)
82 if type(d[k]) is dbus.Byte:
83 print('Key %s --> 0x%x' % (k, d[k]))
85 print('Key %s --> %s' % (k, d[k]))
87 print("Error: Key %s content cannot be printed" % k)
92 if type(e) is dbus.Dictionary:
95 print('Element: %s' % e)
98 def __init__(self, bus, iface_name, command):
99 self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
100 bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
101 member_keyword='signal')
102 bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH,
103 signal_name='InterfaceAdded')
104 bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH,
105 signal_name='InterfaceRemoved')
112 self.line_in = InputLine(self.__command)
116 self.create_if([iface_name])
118 print("Error creating interface: %s" % iface_name)
120 if len(command.strip(' ')):
121 self.__command(command)
123 def help(self, args):
124 list = list(self.command_list.keys())
128 if (ArgFields.help in self.command_list[key]):
129 help = self.command_list[key][ArgFields.help]
131 print("%s\t%s" % (key.rjust(25), help.ljust(50)))
133 def __command(self, cmd_line):
134 cmd = cmd_line.split(' ')
137 func = getattr(self, cmd[0])
138 except Exception as e:
139 print('Error: command unknown - %s' % e)
144 except Exception as e:
147 def __wpa_property_changed(*args, **kwargs):
148 print('WPA - Signal: %s' % kwargs.get('signal'))
150 def __if_property_changed(*args, **kwargs):
151 signal = kwargs.get('signal')
152 print('IF - Signal: %s' % signal)
154 if signal == 'BSSAdded':
158 print_tuple(args[1:])
160 def __p2p_property_changed(*args, **kwargs):
161 print('IF P2P - Signal: %s' % kwargs.get('signal'))
163 print_tuple(args[1:])
165 def __peer_if_p2p_property_changed(*args, **kwargs):
166 print('Peer - ', end=' ')
167 args[0].__p2p_property_changed(*args, **kwargs)
169 def __DeviceFound(self, object_path):
170 self.peers[object_path] = None
172 peer = self.bus.get_object(WPA_INTF, object_path)
173 peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
175 self.bus.add_signal_receiver(self.__peer_if_p2p_property_changed,
176 dbus_interface=WPA_PEER_INTF,
177 path=object_path, member_keyword='signal')
179 self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF)
182 print_dict(self.peers[object_path])
184 def __DeviceLost(self, object_path):
185 if object_path in self.peers:
186 del self.peers[object_path]
188 def __PeerJoined(self, object_path):
189 print('Peer %s joined' % object_path)
191 def __PeerDisconnected(self, object_path):
192 print('Peer %s disconnected' % object_path)
194 def __group_if_property_changed(*args, **kwargs):
195 print('Group - ', end=' ')
196 args[0].__if_property_changed(*args, **kwargs)
198 def __group_if_p2p_property_changed(*args, **kwargs):
199 print('Group - ', end=' ')
200 args[0].__p2p_property_changed(*args, **kwargs)
202 def __GroupFinished(self, properties):
203 print('Group running on %s is being removed' % ifname)
204 self.group_obj = self.group_if = self.group_iface_path = None
207 print_dict(properties)
209 def __InvitationResult(self, response):
210 print('Invitation result status: %d ' % response['status'])
212 if 'bssid' in response:
213 print('bssid: %s' % response['bssid'])
218 def __GroupStarted(self, properties):
219 self.group_obj = properties['group_object']
220 self.bus.add_signal_receiver(self.__PeerJoined,
221 dbus_interface=WPA_GROUP_INTF,
223 signal_name='PeerJoined')
224 self.bus.add_signal_receiver(self.__PeerDisconnected,
225 dbus_interface=WPA_GROUP_INTF,
227 signal_name='PeerDisconnected')
229 self.group_iface_path = properties['interface_object']
230 self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF,
231 self.group_iface_path),
233 self.bus.add_signal_receiver(self.__group_if_property_changed,
234 dbus_interface=WPA_IF_INTF,
235 path=self.group_iface_path,
236 member_keyword='signal')
237 self.bus.add_signal_receiver(self.__group_if_p2p_property_changed,
238 dbus_interface=WPA_P2P_INTF,
239 path=self.group_iface_path,
240 member_keyword='signal')
241 self.bus.add_signal_receiver(self.__GroupFinished,
242 dbus_interface=WPA_P2P_INTF,
243 path=self.group_iface_path,
244 signal_name='GroupFinished')
245 self.bus.add_signal_receiver(self.__InvitationResult,
246 dbus_interface=WPA_P2P_INTF,
247 path=self.iface_path,
248 signal_name='InvitationResult')
251 group = dbus.Interface(self.bus.get_object(WPA_INTF,
253 DBUS_PROPERTIES_INTF)
254 print_dict(group.GetAll(WPA_GROUP_INTF))
256 def __ServiceDiscoveryResponse(self, response):
257 peer = response['peer_object']
258 if peer in self.peers:
259 print('Peer %s has this TLVs:' % (self.peers[peer]['DeviceName']))
260 print(response['tlvs'])
262 def __InterfaceAdded(self, path, properties):
263 print('Interface %s Added (%s)' % (properties['Ifname'], path))
265 print_dict(properties)
266 p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
267 path), DBUS_PROPERTIES_INTF)
268 print_dict(p2p.GetAll(WPA_P2P_INTF))
270 def __InterfaceRemoved(self, path):
271 print('Interface Removed (%s)' % (path))
273 def __listen_if_signals(self):
274 self.bus.add_signal_receiver(self.__if_property_changed,
275 dbus_interface=WPA_IF_INTF,
276 path=self.iface_path,
277 member_keyword='signal')
278 self.bus.add_signal_receiver(self.__p2p_property_changed,
279 dbus_interface=WPA_P2P_INTF,
280 path=self.iface_path,
281 member_keyword='signal')
282 self.bus.add_signal_receiver(self.__GroupStarted,
283 dbus_interface=WPA_P2P_INTF,
284 path=self.iface_path,
285 signal_name='GroupStarted')
286 self.bus.add_signal_receiver(self.__DeviceFound,
287 dbus_interface=WPA_P2P_INTF,
288 path=self.iface_path,
289 signal_name='DeviceFound')
290 self.bus.add_signal_receiver(self.__DeviceLost,
291 dbus_interface=WPA_P2P_INTF,
292 path=self.iface_path,
293 signal_name='DeviceLost')
294 self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse,
295 dbus_interface=WPA_P2P_INTF,
296 path=self.iface_path,
297 signal_name='ServiceDiscoveryResponse')
300 self.iface_path = self.iface_name = self.iface = None
301 self.p2p = self.group_if = self.group_obj = None
304 def __set_if(self, iface_name):
305 self.iface = dbus.Interface(self.bus.get_object(WPA_INTF,
306 self.iface_path), WPA_IF_INTF)
307 self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
308 self.iface_path), WPA_P2P_INTF)
310 p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF)
311 p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig',
312 dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' },
314 print('Interface %s: %s' % (iface_name, self.iface_path))
315 self.iface_name = iface_name
316 self.__listen_if_signals()
319 def enable_debug(self, args):
323 def disable_debug(self, args):
327 def create_if(self, args):
329 self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} ))
330 self.__set_if(args[0])
333 def get_if(self, args):
335 self.iface_path = self.wpa.GetInterface(args[0])
336 self.__set_if(args[0])
339 def del_if(self, args = None):
340 if not self.iface_path:
343 self.wpa.RemoveInterface(self.iface_path)
344 print('Interface %s removed' % self.iface_name)
348 def scan(self, args = None):
352 self.iface.Scan(({ 'Type': 'passive' }))
353 print('Scan started')
356 def quit(self, args = None):
361 def set_command_list(self, command_list):
362 self.command_list = command_list[0]
365 def p2p_find(self, args = None):
372 def p2p_stop_find(self, args = None):
379 def p2p_peers(self, args = None):
384 print('Peer Name=%s' % (self.peers[p]['DeviceName']))
386 def __find_peer(self, peer_name, ret_object_path = False):
387 if len(self.peers) == 0:
392 if self.peers[p]['DeviceName'] == peer_name:
397 print('No peer found under the name: %s' % peer_name)
405 @checkarg(nb_args = 1)
406 def p2p_peer(self, args):
407 peer_path = self.__find_peer(args[0], True)
409 peer = self.bus.get_object(WPA_INTF, peer_path)
410 peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
411 self.peers[peer_path] = peer_if.GetAll(WPA_PEER_INTF)
413 print_dict(self.peers[peer_path])
415 @checkarg(nb_args = 1)
416 def p2p_connect(self, args):
420 peer = self.__find_peer(args[0])
424 peer_path = self.__find_peer(args[0], True)
426 if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER ==
427 P2P_GROUP_CAPAB_GROUP_OWNER):
428 print('Joining an existing P2P group')
429 pin = self.p2p.Connect(({ 'peer' : peer_path,
430 'wps_method' : 'pbc',
434 print('Associating with another P2P device')
435 pin = self.p2p.Connect(({ 'peer' : peer_path,
436 'wps_method' : 'pbc',
440 print('WPS PIN in use: %s' % pin)
442 @checkarg(nb_args = 1)
443 def p2p_disconnect(self, args):
447 peer = self.__find_peer(args[0])
451 if not self.group_if:
452 print('Peer %s is not connected' % (peer['DeviceName']))
455 self.group_if.Disconnect()
458 def p2p_group_add(self, args):
462 self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) }))
465 def p2p_group_remove(self, args):
466 if not self.group_if:
469 self.group_if.Disconnect()
472 def p2p_group(self, args):
473 if not self.group_obj:
476 group = dbus.Interface(self.bus.get_object(WPA_INTF,
477 self.group_obj), DBUS_PROPERTIES_INTF)
478 print_dict(group.GetAll(WPA_GROUP_INTF))
481 def p2p_flush(self, args):
488 def p2p_serv_disc_req(self, args = None):
492 """ We request all kind of services """
493 sd_req = dbus.Array(signature='y', variant_level=1)
495 sd_req.append(dbus.Byte(a))
497 ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req }))
498 print('Service discovery reference: %s' % ref)
500 @checkarg(nb_args = 1)
501 def p2p_serv_disc_cancel_req(self, args):
505 self.p2p.ServiceDiscoveryCancelRequest(int(args[0]))
507 @checkarg(nb_args = 3)
508 def p2p_service_add(self, args):
512 service = { 'service_type' : args[0] }
513 if args[0] == 'upnp':
514 service['version'] = args[1]
515 service['service'] = args[2]
516 elif args[0] == 'bonjour':
517 service['query'] = args[1]
518 service['response'] = args[2]
520 print('Unknown service: %s' % args[0])
523 self.p2p.AddService((service))
525 @checkarg(nb_args = 2, min_args = True)
526 def p2p_service_del(self, args):
530 service = { 'service_type' : args[0] }
531 if args[0] == 'upnp':
532 service['version'] = args[1]
533 service['service'] = args[2]
534 elif args[0] == 'bonjour':
535 service['query'] = args[1]
537 print('Unknown service: %s' % args[0])
540 self.p2p.DeleteService((service))
543 def p2p_service_flush(self, args = None):
547 self.p2p.FlushService()
549 @checkarg(nb_args = 1)
550 def p2p_invite(self, args):
551 if not self.p2p or not self.group_if:
554 peer_path = self.__find_peer(args[0], True)
559 self.group_if.Invite({ 'peer' : peer_path})
561 def build_args(parser):
562 parser.add_argument('-d', default=False, action='store_true',
563 dest='debug', help='enable debug')
564 parser.add_argument('-i', metavar='<interface>', dest='ifname',
565 help='interface name')
569 command['enable_debug'] = {}
570 command['disable_debug'] = {}
571 command['create_if'] = {ArgFields.help:'<iface_name> - create interface'}
572 command['get_if'] = {ArgFields.help:'<iface_name> - get interface'}
573 command['del_if'] = {ArgFields.help:'removes current interface'}
575 command['p2p_find'] = {}
576 command['p2p_stop_find'] = {}
577 command['p2p_flush'] = {}
578 command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'}
579 command['p2p_group_remove'] = {}
580 command['p2p_group'] = {}
581 command['p2p_peers'] = {}
582 command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a '
584 command['p2p_connect'] = {ArgFields.help:'<p2p device name>'}
585 command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'}
586 command['p2p_serv_disc_req'] = {}
587 command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'}
588 command['p2p_service_add'] = {ArgFields.help:'<service type> '
589 '<version/query> <service/response>'}
590 command['p2p_service_del'] = {ArgFields.help:'<service type> '
591 '<version/query> [<service>]'}
592 command['p2p_service_flush'] = {}
593 command['p2p_invite'] = {ArgFields.help:'<p2p device name>'}
595 command_list = list(command.keys())
597 subparsers = parser.add_subparsers(help='commands', dest='command')
598 subparsers.add_parser('')
599 for key in command_list:
602 if ArgFields.help in command[key]:
603 help = command[key][ArgFields.help]
604 if ArgFields.metavar in command[key]:
605 metavar = command[key][ArgFields.metavar]
606 command_parser = subparsers.add_parser(key, help=help)
607 command_parser.add_argument(key, nargs='*', metavar=metavar, help=help)
612 parser = argparse.ArgumentParser(description='Connman P2P Test')
614 command_list = build_args(parser)
618 args = parser.parse_args(argv[1:])
620 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
624 opts = getattr(args, args.command)
627 if opts and len(opts[0]):
628 params = ' ' + ''.join(opts)
630 bus = dbus.SystemBus()
632 mainloop = GLib.MainLoop()
634 wpa_s = Wpa_s(bus, args.ifname, args.command + params)
637 wpa_s.enable_debug([])
639 wpa_s.set_command_list([command_list])
643 if __name__ == '__main__':