3 from os import O_NONBLOCK
4 from sys import stdin, stdout, exit, version_info, argv
5 from fcntl import fcntl, F_GETFL, F_SETFL
8 import dbus.mainloop.glib
12 WPA_NAME='fi.w1.wpa_supplicant1'
13 WPA_INTF='fi.w1.wpa_supplicant1'
14 WPA_PATH='/fi/w1/wpa_supplicant1'
15 WPA_IF_INTF = WPA_INTF + '.Interface'
16 WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
17 WPA_GROUP_INTF = WPA_INTF + '.Group'
18 WPA_PEER_INTF = WPA_INTF + '.Peer'
19 DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'
21 P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0
24 for field in ('help', 'metavar'):
25 exec('{}="{}"'.format(field, field))
28 def __init__(self, handler):
30 self.handler = handler
32 flags = fcntl(stdin.fileno(), F_GETFL)
34 fcntl(stdin.fileno(), F_SETFL, flags)
35 glib.io_add_watch(stdin, glib.IO_IN, self.input_cb)
44 def input_cb(self, fd, event):
45 if event != glib.IO_IN:
48 self.line += fd.read();
49 for line in self.line.split('\n'):
54 self.handler(line.strip())
61 print 'Command Error: %s' % ex
63 def checkarg(nb_args = 0, min_args = False):
65 def wrapper(*args, **kwargs):
69 result = len(args[1]) < nb_args
71 result = len(args[1]) != nb_args
74 raise Exception('Command %s takes %s arguments' %
75 (function.__name__, nb_args))
76 return function(*args, **kwargs)
83 if type(d[k]) is dbus.Byte:
84 print 'Key %s --> 0x%x' % (k, d[k])
86 print 'Key %s --> %s' % (k, d[k])
88 print "Error: Key %s content cannot be printed" % k
93 if type(e) is dbus.Dictionary:
96 print 'Element: %s' % e
99 def __init__(self, bus, iface_name, command):
100 self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
101 bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
102 member_keyword='signal')
103 bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH,
104 signal_name='InterfaceAdded')
105 bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH,
106 signal_name='InterfaceRemoved')
113 self.line_in = InputLine(self.__command)
117 self.create_if([iface_name])
119 print "Error creating interface: %s" % iface_name
121 if len(command.strip(' ')):
122 self.__command(command)
124 def help(self, args):
125 list = self.command_list.keys()
129 if (self.command_list[key].has_key(ArgFields.help)):
130 help = self.command_list[key][ArgFields.help]
132 print "%s\t%s" % (key.rjust(25), help.ljust(50))
134 def __command(self, cmd_line):
135 cmd = cmd_line.split(' ')
138 func = getattr(self, cmd[0])
140 print 'Error: command unknown - %s' % e
148 def __wpa_property_changed(*args, **kwargs):
149 print 'WPA - Signal: %s' % kwargs.get('signal')
151 def __if_property_changed(*args, **kwargs):
152 signal = kwargs.get('signal')
153 print 'IF - Signal: %s' % signal
155 if signal == 'BSSAdded':
159 print_tuple(args[1:])
161 def __p2p_property_changed(*args, **kwargs):
162 print 'IF P2P - Signal: %s' % kwargs.get('signal')
164 print_tuple(args[1:])
166 def __peer_if_p2p_property_changed(*args, **kwargs):
168 args[0].__p2p_property_changed(*args, **kwargs)
170 def __DeviceFound(self, object_path):
171 self.peers[object_path] = None
173 peer = self.bus.get_object(WPA_INTF, object_path)
174 peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
176 self.bus.add_signal_receiver(self.__peer_if_p2p_property_changed,
177 dbus_interface=WPA_PEER_INTF,
178 path=object_path, member_keyword='signal')
180 self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF)
183 print_dict(self.peers[object_path])
185 def __DeviceLost(self, object_path):
186 if object_path in self.peers:
187 del self.peers[object_path]
189 def __PeerJoined(self, object_path):
190 print 'Peer %s joined' % object_path
192 def __PeerDisconnected(self, object_path):
193 print 'Peer %s disconnected' % object_path
195 def __group_if_property_changed(*args, **kwargs):
197 args[0].__if_property_changed(*args, **kwargs)
199 def __group_if_p2p_property_changed(*args, **kwargs):
201 args[0].__p2p_property_changed(*args, **kwargs)
203 def __GroupFinished(self, properties):
204 print 'Group running on %s is being removed' % ifname
205 self.group_obj = self.group_if = self.group_iface_path = None
208 print_dict(properties)
210 def __InvitationResult(self, response):
211 print 'Invitation result status: %d ' % response['status']
213 if response.has_key('bssid'):
214 print 'bssid: %s' % response['bssid']
219 def __GroupStarted(self, properties):
220 self.group_obj = properties['group_object']
221 self.bus.add_signal_receiver(self.__PeerJoined,
222 dbus_interface=WPA_GROUP_INTF,
224 signal_name='PeerJoined')
225 self.bus.add_signal_receiver(self.__PeerDisconnected,
226 dbus_interface=WPA_GROUP_INTF,
228 signal_name='PeerDisconnected')
230 self.group_iface_path = properties['interface_object']
231 self.group_if = dbus.Interface(self.bus.get_object(WPA_INTF,
232 self.group_iface_path),
234 self.bus.add_signal_receiver(self.__group_if_property_changed,
235 dbus_interface=WPA_IF_INTF,
236 path=self.group_iface_path,
237 member_keyword='signal')
238 self.bus.add_signal_receiver(self.__group_if_p2p_property_changed,
239 dbus_interface=WPA_P2P_INTF,
240 path=self.group_iface_path,
241 member_keyword='signal')
242 self.bus.add_signal_receiver(self.__GroupFinished,
243 dbus_interface=WPA_P2P_INTF,
244 path=self.group_iface_path,
245 signal_name='GroupFinished')
246 self.bus.add_signal_receiver(self.__InvitationResult,
247 dbus_interface=WPA_P2P_INTF,
248 path=self.iface_path,
249 signal_name='InvitationResult')
252 group = dbus.Interface(self.bus.get_object(WPA_INTF,
254 DBUS_PROPERTIES_INTF)
255 print_dict(group.GetAll(WPA_GROUP_INTF))
257 def __ServiceDiscoveryResponse(self, response):
258 peer = response['peer_object']
259 if peer in self.peers:
260 print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName'])
261 print response['tlvs']
263 def __InterfaceAdded(self, path, properties):
264 print 'Interface %s Added (%s)' % (properties['Ifname'], path)
266 print_dict(properties)
267 p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
268 path), DBUS_PROPERTIES_INTF)
269 print_dict(p2p.GetAll(WPA_P2P_INTF))
271 def __InterfaceRemoved(self, path):
272 print 'Interface Removed (%s)' % (path)
274 def __listen_if_signals(self):
275 self.bus.add_signal_receiver(self.__if_property_changed,
276 dbus_interface=WPA_IF_INTF,
277 path=self.iface_path,
278 member_keyword='signal')
279 self.bus.add_signal_receiver(self.__p2p_property_changed,
280 dbus_interface=WPA_P2P_INTF,
281 path=self.iface_path,
282 member_keyword='signal')
283 self.bus.add_signal_receiver(self.__GroupStarted,
284 dbus_interface=WPA_P2P_INTF,
285 path=self.iface_path,
286 signal_name='GroupStarted')
287 self.bus.add_signal_receiver(self.__DeviceFound,
288 dbus_interface=WPA_P2P_INTF,
289 path=self.iface_path,
290 signal_name='DeviceFound')
291 self.bus.add_signal_receiver(self.__DeviceLost,
292 dbus_interface=WPA_P2P_INTF,
293 path=self.iface_path,
294 signal_name='DeviceLost')
295 self.bus.add_signal_receiver(self.__ServiceDiscoveryResponse,
296 dbus_interface=WPA_P2P_INTF,
297 path=self.iface_path,
298 signal_name='ServiceDiscoveryResponse')
301 self.iface_path = self.iface_name = self.iface = None
302 self.p2p = self.group_if = self.group_obj = None
305 def __set_if(self, iface_name):
306 self.iface = dbus.Interface(self.bus.get_object(WPA_INTF,
307 self.iface_path), WPA_IF_INTF)
308 self.p2p = dbus.Interface(self.bus.get_object(WPA_INTF,
309 self.iface_path), WPA_P2P_INTF)
311 p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF)
312 p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig',
313 dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' },
315 print 'Interface %s: %s' % (iface_name, self.iface_path)
316 self.iface_name = iface_name
317 self.__listen_if_signals()
320 def enable_debug(self, args):
324 def disable_debug(self, args):
328 def create_if(self, args):
330 self.iface_path = self.wpa.CreateInterface(({ 'Ifname' : args[0]} ))
331 self.__set_if(args[0])
334 def get_if(self, args):
336 self.iface_path = self.wpa.GetInterface(args[0])
337 self.__set_if(args[0])
340 def del_if(self, args = None):
341 if not self.iface_path:
344 self.wpa.RemoveInterface(self.iface_path)
345 print 'Interface %s removed' % self.iface_name
349 def scan(self, args = None):
353 self.iface.Scan(({ 'Type': 'passive' }))
357 def quit(self, args = None):
362 def set_command_list(self, command_list):
363 self.command_list = command_list[0]
366 def p2p_find(self, args = None):
373 def p2p_stop_find(self, args = None):
380 def p2p_peers(self, args = None):
385 print 'Peer Name=%s' % (self.peers[p]['DeviceName'])
387 def __find_peer(self, peer_name, ret_object_path = False):
388 if len(self.peers) == 0:
393 if self.peers[p]['DeviceName'] == peer_name:
398 print 'No peer found under the name: %s' % peer_name
406 @checkarg(nb_args = 1)
407 def p2p_peer(self, args):
408 peer_path = self.__find_peer(args[0], True)
410 peer = self.bus.get_object(WPA_INTF, peer_path)
411 peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)
412 self.peers[peer_path] = peer_if.GetAll(WPA_PEER_INTF)
414 print_dict(self.peers[peer_path])
416 @checkarg(nb_args = 1)
417 def p2p_connect(self, args):
421 peer = self.__find_peer(args[0])
425 peer_path = self.__find_peer(args[0], True)
427 if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER ==
428 P2P_GROUP_CAPAB_GROUP_OWNER):
429 print 'Joining an existing P2P group'
430 pin = self.p2p.Connect(({ 'peer' : peer_path,
431 'wps_method' : 'pbc',
435 print 'Associating with another P2P device'
436 pin = self.p2p.Connect(({ 'peer' : peer_path,
437 'wps_method' : 'pbc',
441 print 'WPS PIN in use: %s' % pin
443 @checkarg(nb_args = 1)
444 def p2p_disconnect(self, args):
448 peer = self.__find_peer(args[0])
452 if not self.group_if:
453 print 'Peer %s is not connected' % (peer['DeviceName'])
456 self.group_if.Disconnect()
459 def p2p_group_add(self, args):
463 self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) }))
466 def p2p_group_remove(self, args):
467 if not self.group_if:
470 self.group_if.Disconnect()
473 def p2p_group(self, args):
474 if not self.group_obj:
477 group = dbus.Interface(self.bus.get_object(WPA_INTF,
478 self.group_obj), DBUS_PROPERTIES_INTF)
479 print_dict(group.GetAll(WPA_GROUP_INTF))
482 def p2p_flush(self, args):
489 def p2p_serv_disc_req(self, args = None):
493 """ We request all kind of services """
494 sd_req = dbus.Array(signature='y', variant_level=1)
496 sd_req.append(dbus.Byte(a))
498 ref = self.p2p.ServiceDiscoveryRequest(({ 'tlv' : sd_req }))
499 print 'Service discovery reference: %s' % ref
501 @checkarg(nb_args = 1)
502 def p2p_serv_disc_cancel_req(self, args):
506 self.p2p.ServiceDiscoveryCancelRequest(int(args[0]))
508 @checkarg(nb_args = 3)
509 def p2p_service_add(self, args):
513 service = { 'service_type' : args[0] }
514 if args[0] == 'upnp':
515 service['version'] = args[1]
516 service['service'] = args[2]
517 elif args[0] == 'bonjour':
518 service['query'] = args[1]
519 service['response'] = args[2]
521 print 'Unknown service: %s' % args[0]
524 self.p2p.AddService((service))
526 @checkarg(nb_args = 2, min_args = True)
527 def p2p_service_del(self, args):
531 service = { 'service_type' : args[0] }
532 if args[0] == 'upnp':
533 service['version'] = args[1]
534 service['service'] = args[2]
535 elif args[0] == 'bonjour':
536 service['query'] = args[1]
538 print 'Unknown service: %s' % args[0]
541 self.p2p.DeleteService((service))
544 def p2p_service_flush(self, args = None):
548 self.p2p.FlushService()
550 @checkarg(nb_args = 1)
551 def p2p_invite(self, args):
552 if not self.p2p or not self.group_if:
555 peer_path = self.__find_peer(args[0], True)
560 self.group_if.Invite({ 'peer' : peer_path})
562 def build_args(parser):
563 parser.add_argument('-d', default=False, action='store_true',
564 dest='debug', help='enable debug')
565 parser.add_argument('-i', metavar='<interface>', dest='ifname',
566 help='interface name')
570 command['enable_debug'] = {}
571 command['disable_debug'] = {}
572 command['create_if'] = {ArgFields.help:'<iface_name> - create interface'}
573 command['get_if'] = {ArgFields.help:'<iface_name> - get interface'}
574 command['del_if'] = {ArgFields.help:'removes current interface'}
576 command['p2p_find'] = {}
577 command['p2p_stop_find'] = {}
578 command['p2p_flush'] = {}
579 command['p2p_group_add'] = {ArgFields.help:'adds an autonomous group'}
580 command['p2p_group_remove'] = {}
581 command['p2p_group'] = {}
582 command['p2p_peers'] = {}
583 command['p2p_peer'] = {ArgFields.help:'<p2p device name> - get info for a '
585 command['p2p_connect'] = {ArgFields.help:'<p2p device name>'}
586 command['p2p_disconnect'] = {ArgFields.help:'<p2p device name>'}
587 command['p2p_serv_disc_req'] = {}
588 command['p2p_serv_disc_cancel_req'] = {ArgFields.help:'<identifier>'}
589 command['p2p_service_add'] = {ArgFields.help:'<service type> '
590 '<version/query> <service/response>'}
591 command['p2p_service_del'] = {ArgFields.help:'<service type> '
592 '<version/query> [<service>]'}
593 command['p2p_service_flush'] = {}
594 command['p2p_invite'] = {ArgFields.help:'<p2p device name>'}
596 command_list = command.keys()
598 subparsers = parser.add_subparsers(help='commands', dest='command')
599 subparsers.add_parser('')
600 for key in command_list:
603 if command[key].has_key(ArgFields.help):
604 help = command[key][ArgFields.help]
605 if command[key].has_key(ArgFields.metavar):
606 metavar = command[key][ArgFields.metavar]
607 command_parser = subparsers.add_parser(key, help=help)
608 command_parser.add_argument(key, nargs='*', metavar=metavar, help=help)
613 if version_info.major != 2:
614 print 'You need to run this under Python 2.x'
617 parser = argparse.ArgumentParser(description='Connman P2P Test')
619 command_list = build_args(parser)
623 args = parser.parse_args(argv[1:])
625 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
629 opts = getattr(args, args.command)
632 if opts and len(opts[0]):
633 params = ' ' + ''.join(opts)
635 bus = dbus.SystemBus()
637 mainloop = gobject.MainLoop()
639 wpa_s = Wpa_s(bus, args.ifname, args.command + params)
642 wpa_s.enable_debug([])
644 wpa_s.set_command_list([command_list])
648 if __name__ == '__main__':