[clflush] Enable x86 cpu cache flush (#5914)
authorZhao Wu <zhaowu@apache.org>
Wed, 15 Jul 2020 22:23:40 +0000 (06:23 +0800)
committerGitHub <noreply@github.com>
Wed, 15 Jul 2020 22:23:40 +0000 (15:23 -0700)
python/tvm/autotvm/measure/measure_methods.py
python/tvm/runtime/module.py
src/runtime/rpc/rpc_module.cc
src/runtime/rpc/rpc_session.h
tutorials/autotvm/tune_relay_x86.py

index b8969f5..03fc09a 100644 (file)
@@ -180,12 +180,18 @@ class RPCRunner(Runner):
         Whether check correctness after measurement. This will use llvm cpu target to
         call your template and get the reference output.
         This can work for TOPI templates, but may not work for your custom template.
+    enable_cpu_cache_flush: bool
+        Whether to flush cache on CPU between repeated measurements.
+        Flushing cache can make the measured latency of one operator closer to
+        its actual latency during end-to-end inference.
+        To make this option effective, the argument `number` should also be set to 1.
+        This is only has effect on CPU task.
     """
     def __init__(self,
                  key, host, port, priority=1,
                  timeout=10, n_parallel=None,
                  number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1,
-                 check_correctness=False):
+                 check_correctness=False, enable_cpu_cache_flush=False):
         super(RPCRunner, self).__init__(timeout, n_parallel)
 
         self.key = key
@@ -200,6 +206,7 @@ class RPCRunner(Runner):
 
         self.ref_input = None
         self.ref_output = None
+        self.enable_cpu_cache_flush = enable_cpu_cache_flush
         self.check_correctness = check_correctness
         self.cooldown_interval = cooldown_interval
 
@@ -267,7 +274,8 @@ class RPCRunner(Runner):
                                            self.cooldown_interval,
                                            remote_args,
                                            self.ref_input,
-                                           self.ref_output)
+                                           self.ref_output,
+                                           self.enable_cpu_cache_flush)
                 futures.append(ret)
 
             for future in futures:
@@ -309,7 +317,12 @@ class LocalRunner(RPCRunner):
         Whether check correctness after measurement. This will use llvm cpu target to
         call your template and get the reference output.
         This can work for TOPI templates, but may not work for your custom template.
-
+    enable_cpu_cache_flush: bool
+        Whether to flush cache on CPU between repeated measurements.
+        Flushing cache can make the measured latency of one operator closer to
+        its actual latency during end-to-end inference.
+        To make this option effective, the argument `number` should also be set to 1.
+        This is only has effect on CPU task.
     Note
     ----
     This is a "fake" local mode. We start a silent rpc tracker and rpc server
@@ -318,13 +331,14 @@ class LocalRunner(RPCRunner):
     def __init__(self,
                  timeout=10,
                  number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1,
-                 check_correctness=False):
+                 check_correctness=False, enable_cpu_cache_flush=False):
         super(LocalRunner, self).__init__('', None, None, 0,
                                           timeout=timeout, n_parallel=1,
                                           number=number, repeat=repeat,
                                           min_repeat_ms=min_repeat_ms,
                                           cooldown_interval=cooldown_interval,
-                                          check_correctness=check_correctness)
+                                          check_correctness=check_correctness,
+                                          enable_cpu_cache_flush=enable_cpu_cache_flush)
         self.tracker = None
         self.server = None
 
@@ -421,7 +435,8 @@ def _wrap_build_func(build_func):
 
 def run_through_rpc(measure_input, build_result,
                     number, repeat, min_repeat_ms, cooldown_interval,
-                    remote_args, ref_input=None, ref_output=None):
+                    remote_args, ref_input=None, ref_output=None,
+                    enable_cpu_cache_flush=False):
     """Run a generated library through rpc
 
     Parameters
@@ -454,6 +469,12 @@ def run_through_rpc(measure_input, build_result,
         The reference input used for checking correctness
     ref_output: List of np.ndarray
         The reference output used for checking correctness
+    enable_cpu_cache_flush: bool
+        Whether to flush cache on CPU between repeated measurements.
+        Flushing cache can make the measured latency of one operator closer to
+        its actual latency during end-to-end inference.
+        To make this option effective, the argument `number` should also be set to 1.
+        This is only has effect on CPU task.
     """
     if isinstance(build_result, MeasureResult):
         return build_result
@@ -473,8 +494,16 @@ def run_through_rpc(measure_input, build_result,
         remote.upload(build_result.filename)
         func = remote.load_module(os.path.split(build_result.filename)[1])
         ctx = remote.context(str(measure_input.target), 0)
+
+        # Limitation:
+        # We can not get PackFunction directly in the remote mode as it is wrapped
+        # under the std::function. We could lift the restriction later once we fold
+        # the PackedFunc as an object. Currently, we pass function name to work
+        # around it.
+        f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else ''
         time_f = func.time_evaluator(
-            func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
+            func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms,
+            f_preproc=f_prepare)
 
         # set input
         if ref_input:
index 3cdb28f..754bb6f 100644 (file)
@@ -163,7 +163,7 @@ class Module(object):
         """
         _ffi_api.ModuleSaveToFile(self, file_name, fmt)
 
-    def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
+    def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0, f_preproc=''):
         """Get an evaluator that measures time cost of running function.
 
         Parameters
@@ -192,6 +192,8 @@ class Module(object):
             minimum duration requirement of one `repeat`.
             i.e., When the run time of one `repeat` falls below this time, the `number` parameter
             will be automatically increased.
+        f_preproc: str, optional
+            The preprocess function name we want to execute before executing the time evaluator.
 
         Note
         ----
@@ -207,7 +209,7 @@ class Module(object):
         try:
             feval = _ffi_api.RPCTimeEvaluator(
                 self, func_name, ctx.device_type, ctx.device_id,
-                number, repeat, min_repeat_ms)
+                number, repeat, min_repeat_ms, f_preproc)
 
             def evaluator(*args):
                 """Internal wrapped evaluator."""
index 89f3e7c..d1eb891 100644 (file)
@@ -26,6 +26,9 @@
 
 #include <cstring>
 #include <memory>
+#if defined(_M_X64) || defined(__x86_64__)
+#include <immintrin.h>
+#endif
 
 #include "rpc_endpoint.h"
 #include "rpc_session.h"
@@ -183,7 +186,7 @@ class RPCModuleNode final : public ModuleNode {
   }
 
   PackedFunc GetTimeEvaluator(const std::string& name, TVMContext ctx, int number, int repeat,
-                              int min_repeat_ms) {
+                              int min_repeat_ms, const std::string& f_preproc_name) {
     InitRemoteFunc(&remote_get_time_evaluator_, "runtime.RPCTimeEvaluator");
     // Remove session mask because we pass ctx by parts.
     int dev_type = ctx.device_type;
@@ -194,11 +197,11 @@ class RPCModuleNode final : public ModuleNode {
     if (module_handle_ != nullptr) {
       return remote_get_time_evaluator_(GetRef<Module>(this), name,
                                         static_cast<int>(ctx.device_type), ctx.device_id, number,
-                                        repeat, min_repeat_ms);
+                                        repeat, min_repeat_ms, f_preproc_name);
     } else {
       return remote_get_time_evaluator_(Optional<Module>(nullptr), name,
                                         static_cast<int>(ctx.device_type), ctx.device_id, number,
-                                        repeat, min_repeat_ms);
+                                        repeat, min_repeat_ms, f_preproc_name);
     }
   }
 
@@ -236,7 +239,7 @@ class RPCModuleNode final : public ModuleNode {
   // The local channel
   std::shared_ptr<RPCSession> sess_;
   // remote function to get time evaluator
-  TypedPackedFunc<PackedFunc(Optional<Module>, std::string, int, int, int, int, int)>
+  TypedPackedFunc<PackedFunc(Optional<Module>, std::string, int, int, int, int, int, std::string)>
       remote_get_time_evaluator_;
   // remote function getter for modules.
   TypedPackedFunc<PackedFunc(Module, std::string, bool)> remote_mod_get_function_;
@@ -300,8 +303,43 @@ std::shared_ptr<RPCSession> RPCModuleGetSession(Module mod) {
   return rmod->sess();
 }
 
+/*!
+ * \brief Flush the cache.
+ * \param addr The address of data we want to flush
+ * \param len The length of data
+ */
+/*
+ * When we are in the tuning of TVM, we will make TVM occupy
+ * the cache fully and doesn't flush it during iteration.
+ * This has problems then in e2e testing, since arrays that
+ * we assume exist in cache (ie. weights) are evicted during e2e runs,
+ * which leads to lower performance.
+ */
+inline void CPUCacheFlushImpl(const char* addr, unsigned int len) {
+// TODO(FrozenGene): Support ARM.
+#if (defined(_M_X64) || defined(__x86_64__))
+  const size_t cache_line = 64;
+  if (addr == nullptr || len <= 0) {
+    return;
+  }
+
+  for (uintptr_t uptr = (uintptr_t)addr & ~(cache_line - 1); uptr < (uintptr_t)addr + len;
+       uptr += cache_line) {
+    _mm_clflush(reinterpret_cast<const void*>(uptr));
+  }
+
+#endif
+}
+
+inline void CPUCacheFlush(int begin_index, const TVMArgs& args) {
+  for (int i = begin_index; i < args.size(); i++) {
+    CPUCacheFlushImpl(static_cast<char*>((args[i].operator DLTensor*()->data)),
+                      GetDataSize(*(args[i].operator DLTensor*())));
+  }
+}
+
 PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repeat,
-                             int min_repeat_ms) {
+                             int min_repeat_ms, PackedFunc f_preproc) {
   CHECK(pf != nullptr);
 
   if (static_cast<int>(ctx.device_type) == static_cast<int>(kDLMicroDev)) {
@@ -310,7 +348,8 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
     return (*get_micro_time_evaluator)(pf, ctx, number, repeat);
   }
 
-  auto ftimer = [pf, ctx, number, repeat, min_repeat_ms](TVMArgs args, TVMRetValue* rv) mutable {
+  auto ftimer = [pf, ctx, number, repeat, min_repeat_ms, f_preproc](TVMArgs args,
+                                                                    TVMRetValue* rv) mutable {
     TVMRetValue temp;
     std::ostringstream os;
     // skip first time call, to activate lazy compilation components.
@@ -319,6 +358,9 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
     DeviceAPI::Get(ctx)->StreamSync(ctx, nullptr);
 
     for (int i = 0; i < repeat; ++i) {
+      if (f_preproc != nullptr) {
+        f_preproc.CallPacked(args, &temp);
+      }
       std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::nanoseconds> tbegin,
           tend;
       double duration_ms = 0.0;
@@ -358,7 +400,7 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
 
 TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator")
     .set_body_typed([](Optional<Module> opt_mod, std::string name, int device_type, int device_id,
-                       int number, int repeat, int min_repeat_ms) {
+                       int number, int repeat, int min_repeat_ms, std::string f_preproc_name) {
       TVMContext ctx;
       ctx.device_type = static_cast<DLDeviceType>(device_type);
       ctx.device_id = device_id;
@@ -367,17 +409,36 @@ TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator")
         std::string tkey = m->type_key();
         if (tkey == "rpc") {
           return static_cast<RPCModuleNode*>(m.operator->())
-              ->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms);
+              ->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms, f_preproc_name);
         } else {
-          return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms);
+          PackedFunc f_preproc;
+          if (!f_preproc_name.empty()) {
+            auto* pf_preproc = runtime::Registry::Get(f_preproc_name);
+            CHECK(pf_preproc != nullptr)
+                << "Cannot find " << f_preproc_name << " in the global function";
+            f_preproc = *pf_preproc;
+          }
+          return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms,
+                                   f_preproc);
         }
       } else {
         auto* pf = runtime::Registry::Get(name);
         CHECK(pf != nullptr) << "Cannot find " << name << " in the global function";
-        return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms);
+        PackedFunc f_preproc;
+        if (!f_preproc_name.empty()) {
+          auto* pf_preproc = runtime::Registry::Get(f_preproc_name);
+          CHECK(pf_preproc != nullptr)
+              << "Cannot find " << f_preproc_name << " in the global function";
+          f_preproc = *pf_preproc;
+        }
+        return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms, f_preproc);
       }
     });
 
+TVM_REGISTER_GLOBAL("cache_flush_cpu_non_first_arg").set_body([](TVMArgs args, TVMRetValue* rv) {
+  CPUCacheFlush(1, args);
+});
+
 // server function registration.
 TVM_REGISTER_GLOBAL("tvm.rpc.server.ImportModule").set_body_typed([](Module parent, Module child) {
   parent->Import(child);
index 6a7e6d6..954c5b4 100644 (file)
@@ -324,10 +324,11 @@ struct RemoteSpace {
  *        minimum duration requirement of one `repeat`.
  *        i.e., When the run time of one `repeat` falls below this time,
  *        the `number` parameter will be automatically increased.
+ * \param f_preproc The function to be executed before we excetute time evaluator.
  * \return f_timer A timer function.
  */
 PackedFunc WrapTimeEvaluator(PackedFunc f, TVMContext ctx, int number, int repeat,
-                             int min_repeat_ms);
+                             int min_repeat_ms, PackedFunc f_preproc = nullptr);
 
 /*!
  * \brief Create a Global RPC module that refers to the session.
index dcc5b25..92fdafb 100644 (file)
@@ -114,6 +114,11 @@ os.environ["TVM_NUM_THREADS"] = str(num_threads)
 # We will use local mode for tuning configuration. RPC tracker
 # mode can be setup similarly to the approach in
 # :ref:`tune_relay_arm` tutorial.
+#
+# To perform a precise measurement, we should repeat the measurement several
+# times and use the average of results. In addition, we need to flush the cache
+# for the weight tensors between repeated measurements. This can make the measured
+# latency of one operator closer to its actual latency during end-to-end inference.
 
 tuning_option = {
     'log_filename': log_file,
@@ -122,8 +127,9 @@ tuning_option = {
 
     'measure_option': autotvm.measure_option(
         builder=autotvm.LocalBuilder(),
-        runner=autotvm.LocalRunner(number=10, repeat=1,
-                                   min_repeat_ms=1000),
+        runner=autotvm.LocalRunner(number=1, repeat=10,
+                                   min_repeat_ms=0,
+                                   enable_cpu_cache_flush=True),
     ),
 }