mesh: Sample Mesh Joiner (provision acceptor)
authorInga Stotland <inga.stotland@intel.com>
Fri, 14 Dec 2018 22:19:08 +0000 (14:19 -0800)
committerAnupam Roy <anupam.r@samsung.com>
Tue, 17 Dec 2019 14:01:53 +0000 (19:31 +0530)
This implements a simple test to excercise Join() method
of org.bluez.mesh.Network interface.

Change-Id: I7c35ba8d979ac19ab2ca07b32551fae39dea9cd4
Signed-off-by: Anupam Roy <anupam.r@samsung.com>
test/test-join [new file with mode: 0644]

diff --git a/test/test-join b/test/test-join
new file mode 100644 (file)
index 0000000..cdf92a2
--- /dev/null
@@ -0,0 +1,408 @@
+#!/usr/bin/env python3
+
+import sys
+import struct
+import numpy
+import dbus
+import dbus.service
+import dbus.exceptions
+
+from threading import Timer
+import time
+
+try:
+  from gi.repository import GObject
+except ImportError:
+  import gobject as GObject
+from dbus.mainloop.glib import DBusGMainLoop
+
+import agent
+
+MESH_SERVICE_NAME = 'org.bluez.mesh'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+
+MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
+MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
+MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
+MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
+
+APP_COMPANY_ID = 0x05f1
+APP_PRODUCT_ID = 0x0001
+APP_VERSION_ID = 0x0001
+
+VENDOR_ID_NONE = 0xffff
+
+mesh_net = None
+app = None
+bus = None
+mainloop = None
+node = None
+
+token = None
+
+def generic_error_cb(error):
+       print('D-Bus call failed: ' + str(error))
+
+def generic_reply_cb():
+       print('D-Bus call done')
+
+def unwrap(item):
+       if isinstance(item, dbus.Boolean):
+               return bool(item)
+       if isinstance(item, (dbus.UInt16, dbus.Int16,
+                        dbus.UInt32, dbus.Int32,
+                        dbus.UInt64, dbus.Int64)):
+               return int(item)
+       if isinstance(item, dbus.Byte):
+               return bytes([int(item)])
+       if isinstance(item, dbus.String):
+               return item
+       if isinstance(item, (dbus.Array, list, tuple)):
+               return [unwrap(x) for x in item]
+       if isinstance(item, (dbus.Dictionary, dict)):
+               return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
+
+       print('Dictionary item not handled')
+       print(type(item))
+       return item
+
+def join_cb():
+       print('Join procedure started')
+
+def join_error_cb(reason):
+       print('Join procedure failed: ', reason)
+
+def attach_app_cb(node_path, dict_array):
+       print('Mesh application registered ', node_path)
+
+       obj = bus.get_object(MESH_SERVICE_NAME, node_path)
+
+       global node
+       node = dbus.Interface(obj, MESH_NODE_IFACE)
+
+       els = unwrap(dict_array)
+       print("Get Elements")
+
+       for el in els:
+               idx = struct.unpack('b', el[0])[0]
+               print('Configuration for Element ', end='')
+               print(idx)
+
+               models = el[1]
+               element = app.get_element(idx)
+               element.set_model_config(models)
+
+def attach_app_error_cb(error):
+       print('Failed to register application: ' + str(error))
+       mainloop.quit()
+
+def attach(token):
+       print('Attach')
+       mesh_net.Attach(app.get_path(), token,
+                                       reply_handler=attach_app_cb,
+                                       error_handler=attach_app_error_cb)
+
+def interfaces_removed_cb(object_path, interfaces):
+       if not mesh_net:
+               return
+
+       if object_path == mesh_net[2]:
+               print('Service was removed')
+       mainloop.quit()
+
+def send_response(path, dest, key, data):
+       print('send response ', end='')
+       print(data)
+       node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
+                                       error_handler=generic_error_cb)
+
+def send_publication(path, model_id, data):
+       print('send publication ', end='')
+       print(data)
+       node.Publish(path, model_id, data, reply_handler=generic_reply_cb,
+                       error_handler=generic_error_cb)
+
+class PubTimer():
+       def __init__(self):
+               self.seconds = None
+               self.func = None
+               self.thread = None
+               self.busy = False
+
+       def _timeout_cb(self):
+               self.func()
+               self.busy = True
+               self._schedule_timer()
+               self.busy =False
+
+       def _schedule_timer(self):
+               self.thread = Timer(self.seconds, self._timeout_cb)
+               self.thread.start()
+
+       def start(self, seconds, func):
+               self.func = func
+               self.seconds = seconds
+               if not self.busy:
+                       self._schedule_timer()
+
+       def cancel(self):
+               print('Cancel timer')
+               if self.thread is not None:
+                       print('Cancel thread')
+                       self.thread.cancel()
+                       self.thread = None
+
+class Application(dbus.service.Object):
+
+       def __init__(self, bus):
+               self.path = '/example'
+               self.agent = None
+               self.elements = []
+               dbus.service.Object.__init__(self, bus, self.path)
+
+       def set_agent(self, agent):
+               self.agent = agent
+
+       def get_path(self):
+               return dbus.ObjectPath(self.path)
+
+       def add_element(self, element):
+               self.elements.append(element)
+
+       def get_element(self, idx):
+               for ele in self.elements:
+                       if ele.get_index() == idx:
+                               return ele
+
+       def get_properties(self):
+               return {
+                       MESH_APPLICATION_IFACE: {
+                       'CompanyID': dbus.UInt16(APP_COMPANY_ID),
+                       'ProductID': dbus.UInt16(APP_PRODUCT_ID),
+                       'VersionID': dbus.UInt16(APP_VERSION_ID)
+                       }
+               }
+
+       @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+
+       def GetManagedObjects(self):
+               response = {}
+               print('GetManagedObjects')
+               response[self.path] = self.get_properties()
+               response[self.agent.get_path()] = self.agent.get_properties()
+               for element in self.elements:
+                       response[element.get_path()] = element.get_properties()
+               return response
+
+       @dbus.service.method(MESH_APPLICATION_IFACE,
+                                       in_signature="t", out_signature="")
+
+       def JoinComplete(self, value):
+               global token
+               print('JoinComplete ', value)
+
+               token = value
+               attach(token)
+
+       @dbus.service.method(MESH_APPLICATION_IFACE,
+                                       in_signature="s", out_signature="")
+
+       def JoinFailed(self, value):
+               print('JoinFailed ', value)
+               token = value
+
+class Element(dbus.service.Object):
+       PATH_BASE = '/example/ele'
+
+       def __init__(self, bus, index):
+               self.path = self.PATH_BASE + format(index, '02x')
+               print(self.path)
+               self.models = []
+               self.bus = bus
+               self.index = index
+               dbus.service.Object.__init__(self, bus, self.path)
+
+       def _get_sig_models(self):
+               ids = []
+               for model in self.models:
+                       id = model.get_id()
+                       vendor = model.get_vendor()
+                       if vendor == VENDOR_ID_NONE:
+                               ids.append(id)
+               return ids
+
+       def get_properties(self):
+               return {
+                       MESH_ELEMENT_IFACE: {
+                       'Index': dbus.Byte(self.index),
+                       'Models': dbus.Array(self._get_sig_models(), 'q')
+                       }
+               }
+
+       def add_model(self, model):
+               model.set_path(self.path)
+               self.models.append(model)
+
+       def get_index(self):
+               return self.index
+
+       def set_model_config(self, configs):
+               print('Set element models config')
+               for config in configs:
+                       mod_id = config[0]
+                       self.UpdateModelConfiguration(mod_id, config[1])
+
+       @dbus.service.method(MESH_ELEMENT_IFACE,
+                                       in_signature="qqbay", out_signature="")
+       def MessageReceived(self, source, key, is_sub, data):
+               print('Message Received on Element ', end='')
+               print(self.index)
+               for model in self.models:
+                       model.process_message(source, key, data)
+
+       @dbus.service.method(MESH_ELEMENT_IFACE,
+                                       in_signature="qa{sv}", out_signature="")
+
+       def UpdateModelConfiguration(self, model_id, config):
+               print('UpdateModelConfig ', end='')
+               print(hex(model_id))
+               for model in self.models:
+                       if model_id == model.get_id():
+                               model.set_config(config)
+                               return
+
+       @dbus.service.method(MESH_ELEMENT_IFACE,
+                                       in_signature="", out_signature="")
+
+       def get_path(self):
+               return dbus.ObjectPath(self.path)
+
+class Model():
+       def __init__(self, model_id):
+               self.cmd_ops = []
+               self.model_id = model_id
+               self.vendor = VENDOR_ID_NONE
+               self.bindings = []
+               self.pub_period = 0
+               self.pub_id = 0
+               self.path = None
+
+       def set_path(self, path):
+               self.path = path
+
+       def get_id(self):
+               return self.model_id
+
+       def get_vendor(self):
+               return self.vendor
+
+       def process_message(self, source, key, data):
+               print('Model process message')
+
+       def set_publication(self, period):
+               self.pub_period = period
+
+       def set_config(self, config):
+               if 'Bindings' in config:
+                       self.bindings = config.get('Bindings')
+                       print('Bindings: ', end='')
+                       print(self.bindings)
+               if 'PublicationPeriod' in config:
+                       self.set_publication(config.get('PublicationPeriod'))
+                       print('Model publication period ', end='')
+                       print(self.pub_period, end='')
+                       print(' ms')
+
+class OnOffServer(Model):
+       def __init__(self, model_id):
+               Model.__init__(self, model_id)
+               self.cmd_ops = { 0x8201, # get
+                                0x8202, # set
+                                0x8203 } # set unacknowledged
+
+               print("OnOff Server ", end="")
+               self.state = 0
+               print('State ', end='')
+               self.timer = PubTimer()
+
+       def process_message(self, source, key, data):
+               datalen = len(data)
+               print('OnOff Server process message len ', datalen)
+
+               if datalen!=2 and datalen!=3:
+                       return
+
+               if datalen==2:
+                       op_tuple=struct.unpack('<H',bytes(data))
+                       opcode = op_tuple[0]
+                       if opcode != 0x8201:
+                               print(hex(opcode))
+                               return
+                       print('Get state')
+               elif datalen==3:
+                       opcode,self.state=struct.unpack('<HB',bytes(data))
+                       if opcode != 0x8202 and opcode != 0x8203:
+                               print(hex(opcode))
+                               return
+                       print('Set state: ', end='')
+                       print(self.state)
+
+               rsp_data = struct.pack('<HB', 0x8204, self.state)
+               send_response(self.path, source, key, rsp_data)
+
+       def publish(self):
+               print('Publish')
+               data = struct.pack('B', self.state)
+               send_publication(self.path, self.model_id, data)
+
+       def set_publication(self, period):
+               if period == 0:
+                       self.pub_period = 0
+                       self.timer.cancel()
+                       return
+
+               # We do not handle ms in this example
+               if period < 1000:
+                       return
+
+               self.pub_period = period
+               self.timer.start(period/1000, self.publish)
+
+def main():
+
+       DBusGMainLoop(set_as_default=True)
+
+       global bus
+       bus = dbus.SystemBus()
+       global mainloop
+       global app
+       global mesh_net
+
+       mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
+                                                "/org/bluez/mesh"),
+                                                MESH_NETWORK_IFACE)
+       mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
+
+       app = Application(bus)
+       prov_agent = agent.Agent(bus)
+       app.set_agent(prov_agent)
+       first_ele = Element(bus, 0x00)
+       first_ele.add_model(OnOffServer(0x1000))
+       app.add_element(first_ele)
+
+       mainloop = GObject.MainLoop()
+
+       print('Join')
+       caps = ["out-numeric"]
+       oob = ["other"]
+       uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
+       print(uuid)
+       mesh_net.Join(app.get_path(), uuid,
+                       reply_handler=join_cb,
+                       error_handler=join_error_cb)
+
+       mainloop.run()
+
+if __name__ == '__main__':
+       main()