From 84fa62617bcd661843d4ae0cbed305cf952d5b80 Mon Sep 17 00:00:00 2001 From: Zhao Wu Date: Wed, 9 Sep 2020 03:07:14 +0800 Subject: [PATCH] [AutoTVM][Ansor] Enable random fill and CPU cache flush for AutoTVM and Ansor (#6391) * [AutoTVM][Ansor] Enable random fill and CPU cache flush for AutoTVM and Ansor * Trigger CI * add assert check of random fill function * remove duplicate tvm.get_global_func * Add check random_fill exists on remote devices * solve pylint --- include/tvm/auto_scheduler/measure.h | 10 ++- python/tvm/auto_scheduler/measure.py | 93 +++++++++++++++++----- python/tvm/autotvm/measure/measure_methods.py | 10 ++- src/auto_scheduler/measure.cc | 22 ++--- .../python/unittest/test_auto_scheduler_measure.py | 17 ++-- 5 files changed, 110 insertions(+), 42 deletions(-) diff --git a/include/tvm/auto_scheduler/measure.h b/include/tvm/auto_scheduler/measure.h index 83d7c8d..8cfc740 100644 --- a/include/tvm/auto_scheduler/measure.h +++ b/include/tvm/auto_scheduler/measure.h @@ -276,6 +276,8 @@ class ProgramRunnerNode : public Object { int min_repeat_ms; /*! \brief The cool down interval between two measurements. */ double cooldown_interval; + /*! \brief Whether to flush cache on CPU between repeated measurements. */ + bool enable_cpu_cache_flush; /*! * \brief Run measurement and return results. @@ -358,8 +360,10 @@ class LocalRunner : public ProgramRunner { * \param repeat The number of times to repeat the measurement. * \param min_repeat_ms The minimum duration of one repeat in milliseconds. * \param cooldown_interval The cool down interval between two measurements. + * \param enable_cpu_cache_flush Whether to flush cache on CPU between repeated measurements. */ - LocalRunner(int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval); + LocalRunner(int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval, + bool enable_cpu_cache_flush); TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(LocalRunner, ProgramRunner, LocalRunnerNode); }; @@ -408,9 +412,11 @@ class RPCRunner : public ProgramRunner { * \param repeat The number of times to repeat the measurement. * \param min_repeat_ms The minimum duration of one repeat in milliseconds. * \param cooldown_interval The cool down interval between two measurements. + * \param enable_cpu_cache_flush Whether to flush cache on CPU between repeated measurements. */ RPCRunner(const String& key, const String& host, int port, int priority, int n_parallel, - int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval); + int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval, + bool enable_cpu_cache_flush); TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(RPCRunner, ProgramRunner, RPCRunnerNode); }; diff --git a/python/tvm/auto_scheduler/measure.py b/python/tvm/auto_scheduler/measure.py index 925de2f..09e7d96 100644 --- a/python/tvm/auto_scheduler/measure.py +++ b/python/tvm/auto_scheduler/measure.py @@ -202,8 +202,6 @@ class LocalBuilder(ProgramBuilder): class LocalRunner(ProgramRunner): """ LocalRunner that uses local CPU/GPU to measures the time cost of programs. - TODO(FrozenGene): Add cpu cache flush to this runner. - Parameters ---------- timeout : int = 10 @@ -227,6 +225,12 @@ class LocalRunner(ProgramRunner): will be automatically increased. cooldown_interval : float = 0.0 The cool down interval between two measurements. + enable_cpu_cache_flush: bool = False + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. """ def __init__(self, @@ -234,9 +238,11 @@ class LocalRunner(ProgramRunner): number=3, repeat=1, min_repeat_ms=0, - cooldown_interval=0.0): + cooldown_interval=0.0, + enable_cpu_cache_flush=False): self.__init_handle_by_constructor__( - _ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval) + _ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval, + enable_cpu_cache_flush) @tvm._ffi.register_object("auto_scheduler.RPCRunner") @@ -245,8 +251,6 @@ class RPCRunner(ProgramRunner): Or sometime we may need to use RPC even in local running to insulate the thread environment. (e.g. running CUDA programs) - TODO(FrozenGene): Add cpu cache flush to this runner. - Parameters ---------- key : str @@ -280,14 +284,20 @@ class RPCRunner(ProgramRunner): will be automatically increased. cooldown_interval : float = 0.0 The cool down interval between two measurements. + enable_cpu_cache_flush: bool = False + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. """ def __init__(self, key, host, port, priority=1, n_parallel=1, timeout=10, number=3, repeat=1, - min_repeat_ms=0, cooldown_interval=0.0): + min_repeat_ms=0, cooldown_interval=0.0, enable_cpu_cache_flush=False): self.__init_handle_by_constructor__( _ffi_api.RPCRunner, key, host, port, priority, n_parallel, timeout, - number, repeat, min_repeat_ms, cooldown_interval) + number, repeat, min_repeat_ms, cooldown_interval, enable_cpu_cache_flush) if check_remote(key, host, port, priority, timeout): print("Get devices for measurement successfully!") @@ -302,8 +312,6 @@ class LocalRPCMeasureContext: """ A context wrapper for running RPCRunner locally. This will launch a local RPC Tracker and local RPC Server. - TODO(FrozenGene): Add cpu cache flush to this RPC context. - Parameters ---------- priority : int = 1 @@ -331,10 +339,16 @@ class LocalRPCMeasureContext: will be automatically increased. cooldown_interval : float = 0.0 The cool down interval between two measurements. + enable_cpu_cache_flush: bool = False + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. """ def __init__(self, priority=1, n_parallel=1, timeout=10, number=3, repeat=1, - min_repeat_ms=0, cooldown_interval=0.0): + min_repeat_ms=0, cooldown_interval=0.0, enable_cpu_cache_flush=False): ctx = tvm.context("cuda", 0) if ctx.exist: cuda_arch = "sm_" + "".join(ctx.compute_version.split('.')) @@ -347,7 +361,7 @@ class LocalRPCMeasureContext: tracker_addr=(self.tracker.host, self.tracker.port)) self.runner = RPCRunner(device_key, host, self.tracker.port, priority, n_parallel, timeout, number, repeat, - min_repeat_ms, cooldown_interval) + min_repeat_ms, cooldown_interval, enable_cpu_cache_flush) # Wait for the processes to start time.sleep(0.5) @@ -507,7 +521,7 @@ def local_builder_build(inputs, timeout, n_parallel, build_func='default', verbo @tvm._ffi.register_func("auto_scheduler.local_runner.run") def local_run(inputs, build_results, timeout=10, number=3, repeat=1, min_repeat_ms=0, cooldown_interval=0, - verbose=1): + enable_cpu_cache_flush=False, verbose=1): """ Run function of LocalRunner to test the performance of the input BuildResults. @@ -538,6 +552,12 @@ def local_run(inputs, build_results, will be automatically increased. cooldown_interval : float = 0.0 The cool down interval between two measurements. + enable_cpu_cache_flush: bool = False + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. verbose: int = 1 Verbosity level. 0 for silent, 1 to output information during program measuring. @@ -555,9 +575,15 @@ def local_run(inputs, build_results, try: func = module.load_module(build_res.filename) ctx = ndarray.context(str(inp.task.target), 0) - # TODO(FrozenGene): Add cpu cache flush to this function. + # Limitation: + # We can not get PackFunction directly in the remote mode as it is wrapped + # under the std::function. We could lift the restriction later once we fold + # the PackedFunc as an object. Currently, we pass function name to work + # around it. + f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else '' time_f = func.time_evaluator( - func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) + func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms, + f_preproc=f_prepare) # pylint: disable=broad-except except Exception: costs = (max_float,) @@ -566,9 +592,12 @@ def local_run(inputs, build_results, if error_no == 0: try: - # TODO(FrozenGene): Update to ndarray.non-empty. args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in build_res.args] + random_fill = tvm.get_global_func("tvm.contrib.random.random_fill", True) + assert random_fill, "Please make sure USE_RANDOM is ON in the config.cmake" + for arg in args: + random_fill(arg) ctx.sync() costs = time_f(*args).results # pylint: disable=broad-except @@ -626,7 +655,8 @@ def rpc_run_worker(index): """ global GLOBAL_RUN_ARGUMENTS inputs, build_results, key, host, port, priority, timeout, number, \ - repeat, min_repeat_ms, cooldown_interval, verbose = GLOBAL_RUN_ARGUMENTS + repeat, min_repeat_ms, cooldown_interval, enable_cpu_cache_flush, \ + verbose = GLOBAL_RUN_ARGUMENTS max_float = 1e10 # We use 1e10 instead of sys.float_info.max for better readability in log inp = inputs[index] @@ -646,9 +676,15 @@ def rpc_run_worker(index): remote.upload(build_res.filename) func = remote.load_module(os.path.split(build_res.filename)[1]) ctx = remote.context(str(inp.task.target), 0) - # TODO(FrozenGene): Add cpu cache flush to this function. + # Limitation: + # We can not get PackFunction directly in the remote mode as it is wrapped + # under the std::function. We could lift the restriction later once we fold + # the PackedFunc as an object. Currently, we pass function name to work + # around it. + f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else '' time_f = func.time_evaluator( - func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) + func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms, + f_preproc=f_prepare) # pylint: disable=broad-except except Exception: costs = (max_float,) @@ -657,9 +693,15 @@ def rpc_run_worker(index): if error_no == 0: try: - # TODO(FrozenGene): Update to ndarray.non-empty. args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in build_res.args] + try: + random_fill = remote.get_function("tvm.contrib.random.random_fill") + except AttributeError: + raise AttributeError("Please make sure USE_RANDOM is ON in the config.cmake " + "on the remote devices") + for arg in args: + random_fill(arg) ctx.sync() costs = time_f(*args).results @@ -698,7 +740,7 @@ def rpc_run_worker(index): @tvm._ffi.register_func("auto_scheduler.rpc_runner.run") def rpc_runner_run(inputs, build_results, key, host, port, priority=1, n_parallel=1, timeout=10, number=3, repeat=1, min_repeat_ms=0, - cooldown_interval=0.0, verbose=1): + cooldown_interval=0.0, enable_cpu_cache_flush=False, verbose=1): """ Run function of RPCRunner to test the performance of the input BuildResults. Parameters @@ -738,6 +780,12 @@ def rpc_runner_run(inputs, build_results, key, host, port, will be automatically increased. cooldown_interval : float = 0.0 The cool down interval between two measurements. + enable_cpu_cache_flush: bool = False + Whether to flush cache on CPU between repeated measurements. + Flushing cache can make the measured latency of one operator closer to + its actual latency during end-to-end inference. + To make this option effective, the argument `number` should also be set to 1. + This is only has effect on CPU task. verbose: int = 1 Verbosity level. 0 for silent, 1 to output information during program measuring. @@ -748,7 +796,8 @@ def rpc_runner_run(inputs, build_results, key, host, port, """ global GLOBAL_RUN_ARGUMENTS GLOBAL_RUN_ARGUMENTS = (inputs, build_results, key, host, port, priority, timeout, number, - repeat, min_repeat_ms, cooldown_interval, verbose) + repeat, min_repeat_ms, cooldown_interval, enable_cpu_cache_flush, + verbose) assert len(inputs) == len(build_results), \ "Measure input size should be equal to build results" diff --git a/python/tvm/autotvm/measure/measure_methods.py b/python/tvm/autotvm/measure/measure_methods.py index 9cef674..db955ff 100644 --- a/python/tvm/autotvm/measure/measure_methods.py +++ b/python/tvm/autotvm/measure/measure_methods.py @@ -511,10 +511,14 @@ def run_through_rpc(measure_input, build_result, if ref_input: args = [nd.array(x, ctx=ctx) for x in ref_input] else: - # create empty arrays on the remote device and copy them once. - # This can avoid some memory issues that make the measurement results unreliable. + try: + random_fill = remote.get_function("tvm.contrib.random.random_fill") + except AttributeError: + raise AttributeError("Please make sure USE_RANDOM is ON in the config.cmake " + "on the remote devices") args = [nd.empty(x[0], dtype=x[1], ctx=ctx) for x in build_result.arg_info] - args = [nd.array(x, ctx=ctx) for x in args] + for arg in args: + random_fill(arg) ctx.sync() costs = time_f(*args).results diff --git a/src/auto_scheduler/measure.cc b/src/auto_scheduler/measure.cc index e249f7b..5642126 100644 --- a/src/auto_scheduler/measure.cc +++ b/src/auto_scheduler/measure.cc @@ -123,21 +123,23 @@ Array LocalBuilderNode::Build(const Array& inputs, in /********** LocalRunner **********/ LocalRunner::LocalRunner(int timeout, int number, int repeat, int min_repeat_ms, - double cooldown_interval) { + double cooldown_interval, bool enable_cpu_cache_flush) { ObjectPtr node = make_object(); node->timeout = timeout; node->number = number; node->repeat = repeat; node->min_repeat_ms = min_repeat_ms; node->cooldown_interval = cooldown_interval; + node->enable_cpu_cache_flush = enable_cpu_cache_flush; data_ = std::move(node); } Array LocalRunnerNode::Run(const Array& inputs, const Array& build_results, int verbose) { if (const auto* f = runtime::Registry::Get("auto_scheduler.local_runner.run")) { - Array results = (*f)(inputs, build_results, timeout, number, repeat, - min_repeat_ms, cooldown_interval, verbose); + Array results = + (*f)(inputs, build_results, timeout, number, repeat, min_repeat_ms, cooldown_interval, + enable_cpu_cache_flush, verbose); return results; } LOG(FATAL) << "auto_scheduler.local_runner.run is not registered. " @@ -149,7 +151,7 @@ Array LocalRunnerNode::Run(const Array& inputs, /********** RPCRunner **********/ RPCRunner::RPCRunner(const String& key, const String& host, int port, int priority, int n_parallel, int timeout, int number, int repeat, int min_repeat_ms, - double cooldown_interval) { + double cooldown_interval, bool enable_cpu_cache_flush) { auto node = make_object(); node->key = key; node->host = host; @@ -161,6 +163,7 @@ RPCRunner::RPCRunner(const String& key, const String& host, int port, int priori node->repeat = repeat; node->min_repeat_ms = min_repeat_ms; node->cooldown_interval = cooldown_interval; + node->enable_cpu_cache_flush = enable_cpu_cache_flush; data_ = std::move(node); } @@ -169,7 +172,7 @@ Array RPCRunnerNode::Run(const Array& inputs, if (const auto* f = runtime::Registry::Get("auto_scheduler.rpc_runner.run")) { Array results = (*f)(inputs, build_results, key, host, port, priority, n_parallel, timeout, number, repeat, - min_repeat_ms, cooldown_interval, verbose); + min_repeat_ms, cooldown_interval, enable_cpu_cache_flush, verbose); return results; } else { LOG(FATAL) << "auto_scheduler.rpc_runner.run is not registered. " @@ -356,16 +359,17 @@ TVM_REGISTER_GLOBAL("auto_scheduler.LocalBuilder") TVM_REGISTER_GLOBAL("auto_scheduler.LocalRunner") .set_body_typed([](int timeout, int number, int repeat, int min_repeat_ms, - double cooldown_interval) { - return LocalRunner(timeout, number, repeat, min_repeat_ms, cooldown_interval); + double cooldown_interval, bool enable_cpu_cache_flush) { + return LocalRunner(timeout, number, repeat, min_repeat_ms, cooldown_interval, + enable_cpu_cache_flush); }); TVM_REGISTER_GLOBAL("auto_scheduler.RPCRunner") .set_body_typed([](const String& key, const String& host, int port, int priority, int n_parallel, int timeout, int number, int repeat, int min_repeat_ms, - double cooldown_interval) { + double cooldown_interval, bool enable_cpu_cache_flush) { return RPCRunner(key, host, port, priority, n_parallel, timeout, number, repeat, - min_repeat_ms, cooldown_interval); + min_repeat_ms, cooldown_interval, enable_cpu_cache_flush); }); } // namespace auto_scheduler diff --git a/tests/python/unittest/test_auto_scheduler_measure.py b/tests/python/unittest/test_auto_scheduler_measure.py index c94515a..93a334c 100644 --- a/tests/python/unittest/test_auto_scheduler_measure.py +++ b/tests/python/unittest/test_auto_scheduler_measure.py @@ -165,7 +165,7 @@ def test_record_pragma_storage_align_rfactor(): record_common(dag, s) -def test_measure_local_builder_runner(): +def test_measure_local_builder_runner(enable_cpu_cache_flush=False): if not tvm.testing.device_enabled("llvm"): return @@ -175,7 +175,8 @@ def test_measure_local_builder_runner(): minp = auto_scheduler.MeasureInput(task, s0) local_builder = auto_scheduler.LocalBuilder() - local_runner = auto_scheduler.LocalRunner(timeout=60) + local_runner = auto_scheduler.LocalRunner(timeout=60, + enable_cpu_cache_flush=enable_cpu_cache_flush) bress = local_builder.build([minp]) assert bress[0].error_no == 0 @@ -183,7 +184,7 @@ def test_measure_local_builder_runner(): assert mress[0].error_no == 0 -def test_measure_local_builder_rpc_runner(): +def test_measure_local_builder_rpc_runner(enable_cpu_cache_flush=False): if not tvm.testing.device_enabled("llvm"): return @@ -193,7 +194,8 @@ def test_measure_local_builder_rpc_runner(): minp = auto_scheduler.MeasureInput(task, s0) local_builder = auto_scheduler.LocalBuilder() - measure_ctx = auto_scheduler.LocalRPCMeasureContext(timeout=60) + measure_ctx = auto_scheduler.LocalRPCMeasureContext(timeout=60, + enable_cpu_cache_flush=enable_cpu_cache_flush) rpc_runner = measure_ctx.runner bress = local_builder.build([minp]) @@ -207,5 +209,8 @@ if __name__ == "__main__": test_record_compute_at_root_inline_cache_read_write() test_record_follow_split_follow_fused_split() test_record_pragma_storage_align_rfactor() - test_measure_local_builder_runner() - test_measure_local_builder_rpc_runner() + test_measure_local_builder_runner(enable_cpu_cache_flush=True) + test_measure_local_builder_runner(enable_cpu_cache_flush=False) + test_measure_local_builder_rpc_runner(enable_cpu_cache_flush=True) + test_measure_local_builder_rpc_runner(enable_cpu_cache_flush=False) + -- 2.7.4