4 # Copyright (c) 2020-2021 Project CHIP Authors
5 # Copyright (c) 2013-2018 Nest Labs, Inc.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
23 # This file implements the Python-based Chip Device Controller Shell.
26 from __future__ import absolute_import
27 from __future__ import print_function
28 from chip import ChipDeviceCtrl
29 from chip import exceptions
34 from optparse import OptionParser, OptionValueError
41 from chip.ChipBleUtility import FAKE_CONN_OBJ_VALUE
43 # Extend sys.path with one or more directories, relative to the location of the
44 # running script, in which the chip package might be found . This makes it
45 # possible to run the device manager shell from a non-standard install location,
46 # as well as directly from its location the CHIP source tree.
48 # Note that relative package locations are prepended to sys.path so as to give
49 # the local version of the package higher priority over any version installed in
50 # a standard location.
52 scriptDir = os.path.dirname(os.path.abspath(__file__))
53 relChipPackageInstallDirs = [
56 "../lib/python%s.%s" % (sys.version_info.major, sys.version_info.minor),
57 "../lib/Python%s%s" % (sys.version_info.major, sys.version_info.minor),
59 for relInstallDir in relChipPackageInstallDirs:
60 absInstallDir = os.path.realpath(os.path.join(scriptDir, relInstallDir))
61 if os.path.isdir(os.path.join(absInstallDir, "chip")):
62 sys.path.insert(0, absInstallDir)
65 if platform.system() == 'Darwin':
66 from chip.ChipCoreBluetoothMgr import CoreBluetoothManager as BleManager
67 elif sys.platform.startswith('linux'):
68 from chip.ChipBluezMgr import BluezManager as BleManager
70 # The exceptions for CHIP Device Controller CLI
73 class ChipDevCtrlException(exceptions.ChipStackException):
77 class ParsingError(ChipDevCtrlException):
78 def __init__(self, msg=None):
79 self.msg = "Parsing Error: " + msg
85 def DecodeBase64Option(option, opt, value):
87 return base64.standard_b64decode(value)
89 raise OptionValueError(
90 "option %s: invalid base64 value: %r" % (opt, value))
93 def DecodeHexIntOption(option, opt, value):
97 raise OptionValueError("option %s: invalid value: %r" % (opt, value))
100 def ParseEncodedString(value):
101 if value.find(":") < 0:
103 "value should be encoded in encoding:encodedvalue format")
104 enc, encValue = value.split(":", 1)
106 return encValue.encode("utf-8") + b'\x00'
108 return bytes.fromhex(encValue)
109 raise ParsingError("only str and hex encoding is supported")
112 def FormatZCLArguments(args, command):
115 if kvPair.find("=") < 0:
116 raise ParsingError("Argument should in key=value format")
117 key, value = kvPair.split("=", 1)
118 valueType = command.get(key, None)
119 if valueType == 'int':
120 commandArgs[key] = int(value)
121 elif valueType == 'str':
122 commandArgs[key] = value
123 elif valueType == 'bytes':
124 commandArgs[key] = ParseEncodedString(value)
128 class DeviceMgrCmd(Cmd):
129 def __init__(self, rendezvousAddr=None, controllerNodeId=0, bluetoothAdapter=0):
130 self.lastNetworkId = None
134 Cmd.identchars = string.ascii_letters + string.digits + "-"
136 if sys.stdin.isatty():
137 self.prompt = "chip-device-ctrl > "
139 self.use_rawinput = 0
142 DeviceMgrCmd.command_names.sort()
146 self.devCtrl = ChipDeviceCtrl.ChipDeviceController(controllerNodeId=controllerNodeId, bluetoothAdapter=bluetoothAdapter)
148 # If we are on Linux and user selects non-default bluetooth adapter.
149 if sys.platform.startswith("linux") and bluetoothAdapter != 0:
150 self.bleMgr = BleManager(self.devCtrl)
151 self.bleMgr.ble_adapter_select("hci{}".format(bluetoothAdapter))
153 self.historyFileName = os.path.expanduser(
154 "~/.chip-device-ctrl-history")
159 if "libedit" in readline.__doc__:
160 readline.parse_and_bind("bind ^I rl_complete")
161 readline.set_completer_delims(" ")
163 readline.read_history_file(self.historyFileName)
171 "ble-adapter-select",
180 "set-pairing-wifi-credential",
181 "set-pairing-thread-credential",
184 def parseline(self, line):
185 cmd, arg, line = Cmd.parseline(self, line)
187 cmd = self.shortCommandName(cmd)
188 line = cmd + " " + arg
189 return cmd, arg, line
191 def completenames(self, text, *ignored):
194 for name in DeviceMgrCmd.command_names
195 if name.startswith(text) or self.shortCommandName(name).startswith(text)
198 def shortCommandName(self, cmd):
199 return cmd.replace("-", "")
201 def precmd(self, line):
202 if not self.use_rawinput and line != "EOF" and line != "":
206 def postcmd(self, stop, line):
207 if not stop and self.use_rawinput:
208 self.prompt = "chip-device-ctrl > "
216 readline.write_history_file(self.historyFileName)
222 def do_help(self, line):
224 cmd, arg, unused = self.parseline(line)
226 doc = getattr(self, "do_" + cmd).__doc__
227 except AttributeError:
230 self.stdout.write("%s\n" % textwrap.dedent(doc))
232 self.stdout.write("No help on %s\n" % (line))
235 "\nAvailable commands (type help <name> for more information):",
236 DeviceMgrCmd.command_names,
241 def do_close(self, line):
245 Close the connection to the device.
248 args = shlex.split(line)
252 self.do_help("close")
257 except exceptions.ChipStackException as ex:
260 def do_setlogoutput(self, line):
262 set-log-output [ none | error | progress | detail ]
264 Set the level of Chip logging output.
267 args = shlex.split(line)
271 self.do_help("set-log-output")
274 print("Unexpected argument: " + args[1])
277 category = args[0].lower()
278 if category == "none":
280 elif category == "error":
282 elif category == "progress":
284 elif category == "detail":
287 print("Invalid argument: " + args[0])
291 self.devCtrl.SetLogFilter(category)
292 except exceptions.ChipStackException as ex:
298 def do_bleadapterselect(self, line):
302 Start BLE adapter select, deprecated, you can select adapter by command line arguments.
304 if sys.platform.startswith("linux"):
306 self.bleMgr = BleManager(self.devCtrl)
308 self.bleMgr.ble_adapter_select(line)
310 "This change only applies to ble-scan\n"
311 "Please run device controller with --bluetooth-adapter=<adapter-name> to select adapter\n" +
312 "e.g. chip-device-ctrl --bluetooth-adapter hci0"
316 "ble-adapter-select only works in Linux, ble-adapter-select mac_address"
321 def do_bleadapterprint(self, line):
325 Print attached BLE adapter.
327 if sys.platform.startswith("linux"):
329 self.bleMgr = BleManager(self.devCtrl)
331 self.bleMgr.ble_adapter_print()
333 print("ble-adapter-print only works in Linux")
337 def do_bledebuglog(self, line):
340 0: disable BLE debug log
341 1: enable BLE debug log
344 self.bleMgr = BleManager(self.devCtrl)
346 self.bleMgr.ble_debug_log(line)
350 def do_blescan(self, line):
354 Start BLE scanning operations.
358 self.bleMgr = BleManager(self.devCtrl)
360 self.bleMgr.scan(line)
364 def do_connect(self, line):
366 connect -ip <ip address> <setup pin code> [<nodeid>]
367 connect -ble <discriminator> <setup pin code> [<nodeid>]
369 connect command is used for establishing a rendezvous session to the device.
370 currently, only connect using setupPinCode is supported.
372 TODO: Add more methods to connect to device (like cert for auth, and IP
377 args = shlex.split(line)
380 self.do_help("connect SetupPinCode")
383 nodeid = random.randint(1, 1000000) # Just a random number
385 nodeid = int(args[3])
386 print("Device is assigned with nodeid = {}".format(nodeid))
388 if args[0] == "-ip" and len(args) >= 3:
389 self.devCtrl.ConnectIP(args[1].encode("utf-8"), int(args[2]), nodeid)
390 elif args[0] == "-ble" and len(args) >= 3:
391 self.devCtrl.ConnectBLE(int(args[1]), int(args[2]), nodeid)
394 self.do_help("connect SetupPinCode")
396 print("Device temporary node id (**this does not match spec**): {}".format(nodeid))
397 except exceptions.ChipStackException as ex:
401 def do_resolve(self, line):
403 resolve <fabricid> <nodeid>
405 Resolve DNS-SD name corresponding with the given fabric and node IDs and
406 update address of the node in the device controller.
409 args = shlex.split(line)
411 err = self.devCtrl.ResolveNode(int(args[0]), int(args[1]))
413 address = self.devCtrl.GetAddressAndPort(int(args[1]))
414 address = "{}:{}".format(*address) if address else "unknown"
415 print("Current address: " + address)
417 self.do_help("resolve")
418 except exceptions.ChipStackException as ex:
422 def do_zcl(self, line):
424 To send ZCL message to device:
425 zcl <cluster> <command> <nodeid> <endpoint> <groupid> [key=value]...
426 To get a list of clusters:
428 To get a list of commands in cluster:
431 Send ZCL command to device nodeid
434 args = shlex.split(line)
435 all_commands = self.devCtrl.ZCLCommandList()
436 if len(args) == 1 and args[0] == '?':
437 print('\n'.join(all_commands.keys()))
438 elif len(args) == 2 and args[0] == '?':
439 if args[1] not in all_commands:
440 raise exceptions.UnknownCluster(args[1])
441 for commands in all_commands.get(args[1]).items():
442 args = ", ".join(["{}: {}".format(argName, argType)
443 for argName, argType in commands[1].items()])
448 print(" <no arguments>")
450 if args[0] not in all_commands:
451 raise exceptions.UnknownCluster(args[0])
452 command = all_commands.get(args[0]).get(args[1], None)
453 # When command takes no arguments, (not command) is True
455 raise exceptions.UnknownCommand(args[0], args[1])
456 self.devCtrl.ZCLSend(args[0], args[1], int(
457 args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command))
460 except exceptions.ChipStackException as ex:
461 print("An exception occurred during process ZCL command:")
463 except Exception as ex:
464 print("An exception occurred during processing input:")
467 def do_zclread(self, line):
469 To read ZCL attribute:
470 zclread <cluster> <attribute> <nodeid> <endpoint> <groupid>
473 args = shlex.split(line)
474 all_attrs = self.devCtrl.ZCLAttributeList()
475 if len(args) == 1 and args[0] == '?':
476 print('\n'.join(all_attrs.keys()))
477 elif len(args) == 2 and args[0] == '?':
478 if args[1] not in all_attrs:
479 raise exceptions.UnknownCluster(args[1])
480 print('\n'.join(all_attrs.get(args[1])))
482 if args[0] not in all_attrs:
483 raise exceptions.UnknownCluster(args[0])
484 self.devCtrl.ZCLReadAttribute(args[0], args[1], int(args[2]), int(args[3]), int(args[4]))
486 self.do_help("zclread")
487 except exceptions.ChipStackException as ex:
488 print("An exception occurred during reading ZCL attribute:")
490 except Exception as ex:
491 print("An exception occurred during processing input:")
494 def do_setpairingwificredential(self, line):
496 set-pairing-wifi-credential <ssid> <password>
498 Set WiFi credential to be used while pairing a Wi-Fi device
501 args = shlex.split(line)
503 self.devCtrl.SetWifiCredential(args[0], args[1])
504 print("WiFi credential set")
506 self.do_help("set-pairing-wifi-credential")
507 except exceptions.ChipStackException as ex:
511 def do_setpairingthreadcredential(self, line):
513 set-pairing-thread-credential <channel> <panid> <masterkey>
515 Set Thread credential to be used while pairing a Thread device
518 args = shlex.split(line)
520 self.devCtrl.SetThreadCredential(int(args[0]), int(args[1], 16), args[2])
521 print("Thread credential set")
523 self.do_help("set-pairing-thread-credential")
524 except exceptions.ChipStackException as ex:
528 def do_history(self, line):
532 Show previously executed commands.
538 h = readline.get_current_history_length()
539 for n in range(1, h + 1):
540 print(readline.get_history_item(n))
544 def do_h(self, line):
545 self.do_history(line)
547 def do_exit(self, line):
550 def do_quit(self, line):
553 def do_q(self, line):
556 def do_EOF(self, line):
565 optParser = OptionParser()
566 optParser.add_option(
570 dest="rendezvousAddr",
571 help="Device rendezvous address",
572 metavar="<ip-address>",
574 optParser.add_option(
576 "--controller-nodeid",
578 dest="controllerNodeId",
581 help="Controller node ID",
585 if sys.platform.startswith("linux"):
586 optParser.add_option(
588 "--bluetooth-adapter",
590 dest="bluetoothAdapter",
593 help="Controller bluetooth adapter ID",
594 metavar="<bluetooth-adapter>",
596 (options, remainingArgs) = optParser.parse_args(sys.argv[1:])
598 if len(remainingArgs) != 0:
599 print("Unexpected argument: %s" % remainingArgs[0])
603 if sys.platform.startswith("linux"):
604 if not options.bluetoothAdapter.startswith("hci"):
605 print("Invalid bluetooth adapter: {}, adapter name looks like hci0, hci1 etc.")
609 adapterId = int(options.bluetoothAdapter[3:])
611 print("Invalid bluetooth adapter: {}, adapter name looks like hci0, hci1 etc.")
614 devMgrCmd = DeviceMgrCmd(rendezvousAddr=options.rendezvousAddr, controllerNodeId=options.controllerNodeId, bluetoothAdapter=adapterId)
615 print("Chip Device Controller Shell")
616 if options.rendezvousAddr:
617 print("Rendezvous address set to %s" % options.rendezvousAddr)
619 # Adapter ID will always be 0
621 print("Bluetooth adapter set to hci{}".format(adapterId))
626 except KeyboardInterrupt:
632 if __name__ == "__main__":