A clone of test/python/unittest/test_runtime_micro.py, however (#5546)
authorTom Gall <tom_gall@mac.com>
Sat, 9 May 2020 15:45:05 +0000 (10:45 -0500)
committerGitHub <noreply@github.com>
Sat, 9 May 2020 15:45:05 +0000 (08:45 -0700)
modified to run specifically on ARM cortex-M hardware, which
currently is just the STM32F746 discovery board.

Signed-off-by: Tom Gall <tom.gall@linaro.org>
tests/micro/test_runtime_micro_on_arm.py [new file with mode: 0644]

diff --git a/tests/micro/test_runtime_micro_on_arm.py b/tests/micro/test_runtime_micro_on_arm.py
new file mode 100644 (file)
index 0000000..a872afe
--- /dev/null
@@ -0,0 +1,385 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import numpy as np
+import tvm
+from tvm import te
+from tvm.contrib import graph_runtime, util
+from tvm import relay
+import tvm.micro as micro
+from tvm.micro import create_micro_mod
+from tvm.relay.testing import resnet
+
+# Use real micro device - an STM32F746 discovery board
+# SETUP: 
+# Be sure to have openocd installed and running
+# Ex : openocd -f board/stm32f7discovery.cfg
+# Be sure to have the ST CMSIS library downloaded, installed and
+# Ex : export CMSIS_ST_PATH="/home/yourid/st/STM32Cube_FW_F7_V1.16.0/Drivers/CMSIS"
+DEV_CONFIG_A = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666)
+DEV_CONFIG_B = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666)
+TARGET = 'c -device=micro_dev'
+
+def relay_micro_build(func, dev_config, params=None):
+    """Create a graph runtime module with a micro device context from a Relay function.
+
+    Parameters
+    ----------
+    func : relay.Function
+        function to compile
+
+    dev_config : Dict[str, Any]
+        MicroTVM config dict for the target device
+
+    params : dict
+        input parameters that do not change during inference
+
+    Return
+    ------
+    mod : tvm.runtime.Module
+        graph runtime module for the target device
+    """
+    disable_vectorize = tvm.target.build_config(disable_vectorize=True)
+    disable_fusion = relay.build_config(disabled_pass={'FuseOps'})
+    with disable_vectorize, disable_fusion:
+        graph, c_mod, params = relay.build(func, target=TARGET, params=params)
+    micro_mod = micro.create_micro_mod(c_mod, dev_config)
+    ctx = tvm.micro_dev(0)
+    mod = graph_runtime.create(graph, micro_mod, ctx)
+    mod.set_input(**params)
+    return mod
+
+
+GDB_INIT_TEMPLATE = """
+layout asm
+target remote localhost:{gdb_port}
+set $pc = UTVMInit
+break UTVMDone
+"""
+
+
+def reset_gdbinit():
+    if 'server_port' not in DEV_CONFIG_A:
+        return
+    try: 
+        gdb_init_dir = os.environ['MICRO_GDB_INIT_DIR']
+    except KeyError:
+        return
+    with open(f'{gdb_init_dir}/.gdbinit', 'w') as f:
+        gdb_port = DEV_CONFIG_A['server_port'] - 3333
+        f.write(GDB_INIT_TEMPLATE.format(gdb_port=gdb_port))
+
+
+def test_alloc():
+    """Test tensor allocation on the device."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+    with micro.Session(DEV_CONFIG_A):
+        ctx = tvm.micro_dev(0)
+        np_tensor = np.random.uniform(size=shape).astype(dtype)
+        micro_tensor = tvm.nd.array(np_tensor, ctx)
+        tvm.testing.assert_allclose(np_tensor, micro_tensor.asnumpy())
+
+
+def test_add():
+    """Test a module which performs addition."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+
+    reset_gdbinit()
+
+    # Construct TVM expression.
+    tvm_shape = tvm.runtime.convert(shape)
+    A = te.placeholder(tvm_shape, name="A", dtype=dtype)
+    B = te.placeholder(tvm_shape, name="B", dtype=dtype)
+    C = te.compute(A.shape, lambda *i: A(*i) + B(*i), name="C")
+    s = te.create_schedule(C.op)
+
+    func_name = "fadd"
+    c_mod = tvm.build(s, [A, B, C], target="c", name=func_name)
+
+    with micro.Session(DEV_CONFIG_A) as sess:
+        micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
+        micro_func = micro_mod[func_name]
+        ctx = tvm.micro_dev(0)
+
+        a_np = np.random.uniform(size=shape).astype(dtype)
+        a = tvm.nd.array(a_np, ctx)
+        b_np = np.random.uniform(size=shape).astype(dtype)
+        b = tvm.nd.array(b_np, ctx)
+        c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx)
+        micro_func(a, b, c)
+
+        # ensure inputs weren't corrupted
+        tvm.testing.assert_allclose(
+                a.asnumpy(), a_np)
+        tvm.testing.assert_allclose(
+                b.asnumpy(), b_np)
+        # ensure output is correct
+        tvm.testing.assert_allclose(
+                c.asnumpy(), a.asnumpy() + b.asnumpy())
+
+
+def test_workspace_add():
+    """Test a module which uses a workspace to compute an intermediate value."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+
+    reset_gdbinit()
+
+    # Construct TVM expression.
+    tvm_shape = tvm.runtime.convert(shape)
+    A = te.placeholder(tvm_shape, name="A", dtype=dtype)
+    B = te.placeholder(tvm_shape, name="B", dtype=dtype)
+    B = te.compute(A.shape, lambda *i: A(*i) + 1, name="B")
+    C = te.compute(A.shape, lambda *i: B(*i) + 1, name="C")
+    s = te.create_schedule(C.op)
+
+    func_name = "fadd_two_workspace"
+    c_mod = tvm.build(s, [A, C], target="c", name=func_name)
+
+    with micro.Session(DEV_CONFIG_A) as sess:
+        micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
+        micro_func = micro_mod[func_name]
+        ctx = tvm.micro_dev(0)
+        a_np = np.random.uniform(size=shape).astype(dtype)
+        a = tvm.nd.array(a_np, ctx)
+        c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx)
+        micro_func(a, c)
+
+        # ensure input wasn't corrupted
+        tvm.testing.assert_allclose(
+                a.asnumpy(), a_np)
+        # ensure output is correct
+        tvm.testing.assert_allclose(
+                c.asnumpy(), a.asnumpy() + 2.0)
+
+
+def test_graph_runtime():
+    """Test a program which uses the graph runtime."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+
+    # Construct Relay program.
+    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
+    xx = relay.multiply(x, x)
+    z = relay.add(xx, relay.const(1.0))
+    func = relay.Function([x], z)
+
+    with micro.Session(DEV_CONFIG_A):
+        mod = relay_micro_build(func, DEV_CONFIG_A)
+
+        x_in = np.random.uniform(size=shape[0]).astype(dtype)
+        mod.run(x=x_in)
+        result = mod.get_output(0).asnumpy()
+
+        tvm.testing.assert_allclose(
+                mod.get_input(0).asnumpy(), x_in)
+        tvm.testing.assert_allclose(
+                result, x_in * x_in + 1.0)
+
+
+def test_conv2d():
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+
+    from tvm.relay import create_executor
+    from tvm.relay import transform
+
+    dshape = (1, 4, 16, 16)
+    dtype = 'int8'
+    func_name = 'fused_nn_conv2d'
+
+    reset_gdbinit()
+
+    # Construct Relay program.
+    x = relay.var("x", shape=dshape, dtype=dtype)
+    conv_expr = relay.nn.conv2d(
+            x, relay.var("w"),
+            kernel_size=(3, 3),
+            padding=(1, 1),
+            channels=4)
+    func = relay.Function(relay.analysis.free_vars(conv_expr), conv_expr)
+    mod = tvm.IRModule.from_expr(func)
+    mod = transform.InferType()(mod)
+
+    x_shape = list(map(lambda x: x.value, mod['main'].params[0].checked_type.shape))
+    w_shape = list(map(lambda x: x.value, mod['main'].params[1].checked_type.shape))
+    out_shape = list(map(lambda x: x.value, mod['main'].ret_type.shape))
+
+    with tvm.target.build_config(disable_vectorize=True):
+        graph, c_mod, params = relay.build(mod, target="c")
+
+    with micro.Session(DEV_CONFIG_A):
+        micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
+        candidate_func_name = func_name
+        for i in range(100):
+            try:
+                micro_func = micro_mod[candidate_func_name]
+                break
+            except tvm.TVMError as e:
+                candidate_func_name = f'{func_name}_{i}'
+        else:
+            assert False
+        ctx = tvm.micro_dev(0)
+
+        x_data = tvm.nd.array(np.random.uniform(size=x_shape).astype(dtype), ctx)
+        w_data = tvm.nd.array(np.random.uniform(size=w_shape).astype(dtype), ctx)
+        result = tvm.nd.array(np.zeros(shape=out_shape, dtype=dtype), ctx)
+        micro_func(x_data, w_data, result)
+
+        out_data = np.zeros(out_shape, dtype=dtype)
+        params = { 'x': x_data.asnumpy(), 'w': w_data.asnumpy() }
+        intrp = create_executor('debug')
+        expected_result = intrp.evaluate(mod['main'])(x_data, w_data)
+
+        tvm.testing.assert_allclose(result.asnumpy(), expected_result.asnumpy())
+
+
+def test_interleave_sessions():
+    """Test closing and reopening sessions."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+
+    # Construct Relay add program.
+    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
+    ret = relay.add(x, relay.const(1.0))
+    add_const_func = relay.Function([x], ret)
+
+    sess_a = micro.Session(DEV_CONFIG_A)
+    sess_b = micro.Session(DEV_CONFIG_B)
+    with sess_a:
+        np_tensor_a = np.random.uniform(size=shape).astype(dtype)
+        micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
+    with sess_b:
+        np_tensor_b = np.random.uniform(size=shape).astype(dtype)
+        micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0))
+    with sess_a:
+        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
+        add_const_mod.run(x=micro_tensor_a)
+        add_result = add_const_mod.get_output(0).asnumpy()
+        tvm.testing.assert_allclose(
+                add_result, np_tensor_a + 1.0)
+    with sess_b:
+        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_B)
+        add_const_mod.run(x=micro_tensor_b)
+        add_result = add_const_mod.get_output(0).asnumpy()
+        tvm.testing.assert_allclose(
+                add_result, np_tensor_b + 1.0)
+
+
+def test_nested_sessions():
+    """Test entering and exiting nested session contexts."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+
+    # Construct Relay add program.
+    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
+    ret = relay.add(x, relay.const(1.0))
+    add_const_func = relay.Function([x], ret)
+
+    sess_a = micro.Session(DEV_CONFIG_A)
+    sess_b = micro.Session(DEV_CONFIG_B)
+    with sess_a:
+        np_tensor_a = np.random.uniform(size=shape).astype(dtype)
+        micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
+        with sess_b:
+            np_tensor_b = np.random.uniform(size=shape).astype(dtype)
+            micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0))
+        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
+        add_const_mod.run(x=micro_tensor_a)
+        add_result = add_const_mod.get_output(0).asnumpy()
+        tvm.testing.assert_allclose(
+                add_result, np_tensor_a + 1.0)
+
+
+def test_inactive_session_use():
+    """Test the use of objects allocated in a session that is no longer active."""
+    if not tvm.runtime.enabled("micro_dev"):
+        return
+    shape = (1024,)
+    dtype = "float32"
+
+    # Construct Relay add program.
+    x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
+    ret = relay.add(x, relay.const(1.0))
+    add_const_func = relay.Function([x], ret)
+
+    sess_a = micro.Session(DEV_CONFIG_A)
+    sess_b = micro.Session(DEV_CONFIG_B)
+    with sess_a:
+        np_tensor_a = np.random.uniform(size=shape).astype(dtype)
+        micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
+        add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
+
+    with sess_b:
+        # These objects belong to `sess_a`.
+        add_const_mod.run(x=micro_tensor_a)
+        add_result = add_const_mod.get_output(0).asnumpy()
+        tvm.testing.assert_allclose(
+                add_result, np_tensor_a + 1.0)
+
+
+# TODO add workspace alloc/free stress test
+
+if __name__ == "__main__":
+    test_alloc()
+    print()
+    print('finished alloc test')
+    input('[press enter to continue]')
+    test_add()
+    print()
+    print('finished add test')
+    input('[press enter to continue]')
+    test_workspace_add()
+    print()
+    print('finished workspace add test')
+    input('[press enter to continue]')
+    test_graph_runtime()
+    print()
+    print('finished graph runtime test')
+    input('[press enter to continue]')
+    test_conv2d()
+    print()
+    print('finished conv2d test')
+    input('[press enter to continue]')
+    # disable for now as these are currently broken
+    #test_interleave_sessions()
+    #print()
+    #print('finished interleaved sessions test')
+    #input('[press enter to continue]')
+    # test_nested_sessions()
+    #print()
+    #print('finished nested sessions test')
+    #input('[press enter to continue]')
+    test_inactive_session_use()
+    print()
+    print('finished use inactive session test')
+    input('[press enter to continue]')