#
# The main menu:
# token
-# create
-# scan
-# add
# join
# attach
# remove
# The token can be set from command line arguments as
# well.
#
-# create
-# Creates a new mesh network, with its first local
-# node. The test generates device UUID to store for
-# the initial node, and the daemon will create all
-# the other parameters including Unicast address 0x0001
-# for the new nodes primary element.
-# In case of successful creation, the application
-# automatically attaches as a node to the daemon. A node
-# 'token' is returned to the application and is used
-# for the runtime of the test, and may be used in future
-# attach requests.
-#
-# scan
-# Scan for unprovisioned devices
-#
-# add
-# Adds a remote node to a mesh network that we have provisioning
-# authorization to. The test prompts for a remote devices
-# UUID, and supplies an Agent that will handle the interaction,
-# and provide the provisioning data which will complete to
-# process.
-#
# join
# Request provisioning of a device to become a node
# on a mesh network. The test generates device UUID
# For the call to be successful, the valid node token must
# be already set, either from command arguments or by
# executing "set token" operation or automatically after
-# successfully executing "join" or "create" operation in
-# the same test run.
+# successfully executing "join" operation in the same
+# test run.
#
# remove
# Permanently removes any node configuration from daemon
MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
-MESH_PROV_IFACE = 'org.bluez.mesh.Provisioner1'
MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
APP_COMPANY_ID = 0x05f1
reply_handler=attach_app_cb,
error_handler=attach_app_error_cb)
-def scan_cb():
- print('Scan procedure started')
-
-def scan_error_cb(reason):
- print('Scan procedure failed ', reason)
-
-def add_cb():
- print('AddNode procedure started')
-
-def add_error_cb(reason):
- print('AddNode procedure failed ', reason)
-
def join_cb():
print('Join procedure started')
def join_error_cb(reason):
print('Join procedure failed: ', reason)
-def create_cb(value):
- global token
- global have_token
- global attach
-
- print(set_yellow('Created mesh network with token ') +
- set_green(format(value, '016x')))
-
- token = value
- have_token = True
- if attached == False:
- attach(token)
-
-def create_error_cb(reason):
- print('Create procedure failed: ', reason)
-
def remove_node_cb():
global attached
global have_token
'CompanyID': dbus.UInt16(APP_COMPANY_ID),
'ProductID': dbus.UInt16(APP_PRODUCT_ID),
'VersionID': dbus.UInt16(APP_VERSION_ID)
- },
- MESH_PROV_IFACE: {
}
}
def JoinFailed(self, value):
print(set_error('JoinFailed '), value)
- @dbus.service.method(MESH_PROV_IFACE, in_signature="naya{sv}",
- out_signature="")
- def ScanResult(self, rssi, data, options):
- global remote_uuid
- remote_uuid = data[:16]
- uuid_str = array_to_string(remote_uuid)
- data_str = array_to_string(data[16:])
- if len(data_str) == 0:
- data_str = 'Not Present'
-
- print(set_yellow('ScanResult >> RSSI: ') +
- set_green(format(rssi, 'd')) +
- set_yellow(format(' UUID: ')) +
- set_green(format(uuid_str, 's')) +
- set_yellow(format(' OOB Data: ')) +
- set_green(format(data_str, 's')))
-
- @dbus.service.method(MESH_PROV_IFACE,
- in_signature="y", out_signature="qq")
- def RequestProvData(self, count):
- print('RequestProvData for Ele_Cnt '
- + set_green(format(count, 'd')))
- return dbus.Struct((dbus.UInt16(0), dbus.UInt16(678)))
-
- @dbus.service.method(MESH_PROV_IFACE,
- in_signature="ayqy", out_signature="")
- def AddNodeComplete(self, uuid, unicast, count):
- uuid_str = array_to_string(uuid)
- print(set_yellow('AddNodeComplete of node ')
- + set_green(format(unicast, '04x'))
- + ' uuid ' + uuid_str)
-
- @dbus.service.method(MESH_PROV_IFACE,
- in_signature="ays", out_signature="")
-
- def AddNodeFailed(self, uuid, value):
- print(set_error('AddNodeFailed '), value)
-
class Element(dbus.service.Object):
PATH_BASE = '/example/ele'
menu_items = {
'token': MenuItem(' - set node ID (token)',
self.__cmd_set_token),
- 'create': MenuItem(' - create mesh network',
- self.__cmd_create),
- 'scan': MenuItem(' - scan for near unprovisioned devs',
- self.__cmd_scan),
- 'add': MenuItem(' - add device to mesh network',
- self.__cmd_add),
'join': MenuItem(' - join mesh network',
self.__cmd_join),
'attach': MenuItem(' - attach mesh node',
user_input = INPUT_MESSAGE_PAYLOAD;
print(set_cyan('Enter message payload (hex):'))
- def __cmd_create(self):
- if agent == None:
- print(set_error('Provisioning agent not found'))
- return
-
- uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
- random.shuffle(uuid)
- uuid_str = array_to_string(uuid)
-
- print(set_yellow('Creating with UUID ') + set_green(uuid_str))
- mesh_net.CreateNetwork(app.get_path(), uuid,
- reply_handler=create_cb,
- error_handler=create_error_cb)
-
def __cmd_join(self):
if agent == None:
print(set_error('Provisioning agent not found'))
reply_handler=join_cb,
error_handler=join_error_cb)
- def __cmd_scan(self):
- options = {}
- options['Seconds'] = dbus.UInt16(0)
-
- print(set_yellow('Scanning...'))
- node_mgr.UnprovisionedScan(options,
- reply_handler=scan_cb,
- error_handler=scan_error_cb)
-
- def __cmd_add(self):
- global user_input
- global remote_uuid
-
- if agent == None:
- print(set_error('Provisioning agent not found'))
- return
-
- uuid_str = array_to_string(remote_uuid)
- options = {}
-
- print(set_yellow('Adding dev UUID ') + set_green(uuid_str))
- node_mgr.AddNode(remote_uuid, options, reply_handler=add_cb,
- error_handler=add_error_cb)
-
def __cmd_attach(self):
if have_token == False:
print(set_error('Token is not set'))