Skip to content

Commit

Permalink
[clflush] Enable x86 cpu cache flush (#5914)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrozenGene authored Jul 15, 2020
1 parent 5c73efe commit ae4480a
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 22 deletions.
43 changes: 36 additions & 7 deletions python/tvm/autotvm/measure/measure_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,18 @@ class RPCRunner(Runner):
Whether check correctness after measurement. This will use llvm cpu target to
call your template and get the reference output.
This can work for TOPI templates, but may not work for your custom template.
enable_cpu_cache_flush: bool
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
"""
def __init__(self,
key, host, port, priority=1,
timeout=10, n_parallel=None,
number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1,
check_correctness=False):
check_correctness=False, enable_cpu_cache_flush=False):
super(RPCRunner, self).__init__(timeout, n_parallel)

self.key = key
Expand All @@ -200,6 +206,7 @@ def __init__(self,

self.ref_input = None
self.ref_output = None
self.enable_cpu_cache_flush = enable_cpu_cache_flush
self.check_correctness = check_correctness
self.cooldown_interval = cooldown_interval

Expand Down Expand Up @@ -267,7 +274,8 @@ def run(self, measure_inputs, build_results):
self.cooldown_interval,
remote_args,
self.ref_input,
self.ref_output)
self.ref_output,
self.enable_cpu_cache_flush)
futures.append(ret)

for future in futures:
Expand Down Expand Up @@ -309,7 +317,12 @@ class LocalRunner(RPCRunner):
Whether check correctness after measurement. This will use llvm cpu target to
call your template and get the reference output.
This can work for TOPI templates, but may not work for your custom template.
enable_cpu_cache_flush: bool
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
Note
----
This is a "fake" local mode. We start a silent rpc tracker and rpc server
Expand All @@ -318,13 +331,14 @@ class LocalRunner(RPCRunner):
def __init__(self,
timeout=10,
number=4, repeat=3, min_repeat_ms=0, cooldown_interval=0.1,
check_correctness=False):
check_correctness=False, enable_cpu_cache_flush=False):
super(LocalRunner, self).__init__('', None, None, 0,
timeout=timeout, n_parallel=1,
number=number, repeat=repeat,
min_repeat_ms=min_repeat_ms,
cooldown_interval=cooldown_interval,
check_correctness=check_correctness)
check_correctness=check_correctness,
enable_cpu_cache_flush=enable_cpu_cache_flush)
self.tracker = None
self.server = None

Expand Down Expand Up @@ -421,7 +435,8 @@ def _wrapped(measure_input, tmp_dir, **kwargs):

def run_through_rpc(measure_input, build_result,
number, repeat, min_repeat_ms, cooldown_interval,
remote_args, ref_input=None, ref_output=None):
remote_args, ref_input=None, ref_output=None,
enable_cpu_cache_flush=False):
"""Run a generated library through rpc
Parameters
Expand Down Expand Up @@ -454,6 +469,12 @@ def run_through_rpc(measure_input, build_result,
The reference input used for checking correctness
ref_output: List of np.ndarray
The reference output used for checking correctness
enable_cpu_cache_flush: bool
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
"""
if isinstance(build_result, MeasureResult):
return build_result
Expand All @@ -473,8 +494,16 @@ def run_through_rpc(measure_input, build_result,
remote.upload(build_result.filename)
func = remote.load_module(os.path.split(build_result.filename)[1])
ctx = remote.context(str(measure_input.target), 0)

# Limitation:
# We can not get PackFunction directly in the remote mode as it is wrapped
# under the std::function. We could lift the restriction later once we fold
# the PackedFunc as an object. Currently, we pass function name to work
# around it.
f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else ''
time_f = func.time_evaluator(
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms,
f_preproc=f_prepare)

# set input
if ref_input:
Expand Down
6 changes: 4 additions & 2 deletions python/tvm/runtime/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def save(self, file_name, fmt=""):
"""
_ffi_api.ModuleSaveToFile(self, file_name, fmt)

def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0, f_preproc=''):
"""Get an evaluator that measures time cost of running function.
Parameters
Expand Down Expand Up @@ -192,6 +192,8 @@ def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
minimum duration requirement of one `repeat`.
i.e., When the run time of one `repeat` falls below this time, the `number` parameter
will be automatically increased.
f_preproc: str, optional
The preprocess function name we want to execute before executing the time evaluator.
Note
----
Expand All @@ -207,7 +209,7 @@ def time_evaluator(self, func_name, ctx, number=10, repeat=1, min_repeat_ms=0):
try:
feval = _ffi_api.RPCTimeEvaluator(
self, func_name, ctx.device_type, ctx.device_id,
number, repeat, min_repeat_ms)
number, repeat, min_repeat_ms, f_preproc)

def evaluator(*args):
"""Internal wrapped evaluator."""
Expand Down
81 changes: 71 additions & 10 deletions src/runtime/rpc/rpc_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#include <cstring>
#include <memory>
#if defined(_M_X64) || defined(__x86_64__)
#include <immintrin.h>
#endif

#include "rpc_endpoint.h"
#include "rpc_session.h"
Expand Down Expand Up @@ -183,7 +186,7 @@ class RPCModuleNode final : public ModuleNode {
}

PackedFunc GetTimeEvaluator(const std::string& name, TVMContext ctx, int number, int repeat,
int min_repeat_ms) {
int min_repeat_ms, const std::string& f_preproc_name) {
InitRemoteFunc(&remote_get_time_evaluator_, "runtime.RPCTimeEvaluator");
// Remove session mask because we pass ctx by parts.
int dev_type = ctx.device_type;
Expand All @@ -194,11 +197,11 @@ class RPCModuleNode final : public ModuleNode {
if (module_handle_ != nullptr) {
return remote_get_time_evaluator_(GetRef<Module>(this), name,
static_cast<int>(ctx.device_type), ctx.device_id, number,
repeat, min_repeat_ms);
repeat, min_repeat_ms, f_preproc_name);
} else {
return remote_get_time_evaluator_(Optional<Module>(nullptr), name,
static_cast<int>(ctx.device_type), ctx.device_id, number,
repeat, min_repeat_ms);
repeat, min_repeat_ms, f_preproc_name);
}
}

Expand Down Expand Up @@ -236,7 +239,7 @@ class RPCModuleNode final : public ModuleNode {
// The local channel
std::shared_ptr<RPCSession> sess_;
// remote function to get time evaluator
TypedPackedFunc<PackedFunc(Optional<Module>, std::string, int, int, int, int, int)>
TypedPackedFunc<PackedFunc(Optional<Module>, std::string, int, int, int, int, int, std::string)>
remote_get_time_evaluator_;
// remote function getter for modules.
TypedPackedFunc<PackedFunc(Module, std::string, bool)> remote_mod_get_function_;
Expand Down Expand Up @@ -300,8 +303,43 @@ std::shared_ptr<RPCSession> RPCModuleGetSession(Module mod) {
return rmod->sess();
}

/*!
* \brief Flush the cache.
* \param addr The address of data we want to flush
* \param len The length of data
*/
/*
* When we are in the tuning of TVM, we will make TVM occupy
* the cache fully and doesn't flush it during iteration.
* This has problems then in e2e testing, since arrays that
* we assume exist in cache (ie. weights) are evicted during e2e runs,
* which leads to lower performance.
*/
inline void CPUCacheFlushImpl(const char* addr, unsigned int len) {
// TODO(FrozenGene): Support ARM.
#if (defined(_M_X64) || defined(__x86_64__))
const size_t cache_line = 64;
if (addr == nullptr || len <= 0) {
return;
}

for (uintptr_t uptr = (uintptr_t)addr & ~(cache_line - 1); uptr < (uintptr_t)addr + len;
uptr += cache_line) {
_mm_clflush(reinterpret_cast<const void*>(uptr));
}

#endif
}

inline void CPUCacheFlush(int begin_index, const TVMArgs& args) {
for (int i = begin_index; i < args.size(); i++) {
CPUCacheFlushImpl(static_cast<char*>((args[i].operator DLTensor*()->data)),
GetDataSize(*(args[i].operator DLTensor*())));
}
}

PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repeat,
int min_repeat_ms) {
int min_repeat_ms, PackedFunc f_preproc) {
CHECK(pf != nullptr);

if (static_cast<int>(ctx.device_type) == static_cast<int>(kDLMicroDev)) {
Expand All @@ -310,7 +348,8 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
return (*get_micro_time_evaluator)(pf, ctx, number, repeat);
}

auto ftimer = [pf, ctx, number, repeat, min_repeat_ms](TVMArgs args, TVMRetValue* rv) mutable {
auto ftimer = [pf, ctx, number, repeat, min_repeat_ms, f_preproc](TVMArgs args,
TVMRetValue* rv) mutable {
TVMRetValue temp;
std::ostringstream os;
// skip first time call, to activate lazy compilation components.
Expand All @@ -319,6 +358,9 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe
DeviceAPI::Get(ctx)->StreamSync(ctx, nullptr);

for (int i = 0; i < repeat; ++i) {
if (f_preproc != nullptr) {
f_preproc.CallPacked(args, &temp);
}
std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::nanoseconds> tbegin,
tend;
double duration_ms = 0.0;
Expand Down Expand Up @@ -358,7 +400,7 @@ PackedFunc WrapTimeEvaluator(PackedFunc pf, TVMContext ctx, int number, int repe

TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator")
.set_body_typed([](Optional<Module> opt_mod, std::string name, int device_type, int device_id,
int number, int repeat, int min_repeat_ms) {
int number, int repeat, int min_repeat_ms, std::string f_preproc_name) {
TVMContext ctx;
ctx.device_type = static_cast<DLDeviceType>(device_type);
ctx.device_id = device_id;
Expand All @@ -367,17 +409,36 @@ TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator")
std::string tkey = m->type_key();
if (tkey == "rpc") {
return static_cast<RPCModuleNode*>(m.operator->())
->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms);
->GetTimeEvaluator(name, ctx, number, repeat, min_repeat_ms, f_preproc_name);
} else {
return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms);
PackedFunc f_preproc;
if (!f_preproc_name.empty()) {
auto* pf_preproc = runtime::Registry::Get(f_preproc_name);
CHECK(pf_preproc != nullptr)
<< "Cannot find " << f_preproc_name << " in the global function";
f_preproc = *pf_preproc;
}
return WrapTimeEvaluator(m.GetFunction(name, false), ctx, number, repeat, min_repeat_ms,
f_preproc);
}
} else {
auto* pf = runtime::Registry::Get(name);
CHECK(pf != nullptr) << "Cannot find " << name << " in the global function";
return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms);
PackedFunc f_preproc;
if (!f_preproc_name.empty()) {
auto* pf_preproc = runtime::Registry::Get(f_preproc_name);
CHECK(pf_preproc != nullptr)
<< "Cannot find " << f_preproc_name << " in the global function";
f_preproc = *pf_preproc;
}
return WrapTimeEvaluator(*pf, ctx, number, repeat, min_repeat_ms, f_preproc);
}
});

TVM_REGISTER_GLOBAL("cache_flush_cpu_non_first_arg").set_body([](TVMArgs args, TVMRetValue* rv) {
CPUCacheFlush(1, args);
});

// server function registration.
TVM_REGISTER_GLOBAL("tvm.rpc.server.ImportModule").set_body_typed([](Module parent, Module child) {
parent->Import(child);
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/rpc/rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,11 @@ struct RemoteSpace {
* minimum duration requirement of one `repeat`.
* i.e., When the run time of one `repeat` falls below this time,
* the `number` parameter will be automatically increased.
* \param f_preproc The function to be executed before we excetute time evaluator.
* \return f_timer A timer function.
*/
PackedFunc WrapTimeEvaluator(PackedFunc f, TVMContext ctx, int number, int repeat,
int min_repeat_ms);
int min_repeat_ms, PackedFunc f_preproc = nullptr);

/*!
* \brief Create a Global RPC module that refers to the session.
Expand Down
10 changes: 8 additions & 2 deletions tutorials/autotvm/tune_relay_x86.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def get_network(name, batch_size):
# We will use local mode for tuning configuration. RPC tracker
# mode can be setup similarly to the approach in
# :ref:`tune_relay_arm` tutorial.
#
# To perform a precise measurement, we should repeat the measurement several
# times and use the average of results. In addition, we need to flush the cache
# for the weight tensors between repeated measurements. This can make the measured
# latency of one operator closer to its actual latency during end-to-end inference.

tuning_option = {
'log_filename': log_file,
Expand All @@ -122,8 +127,9 @@ def get_network(name, batch_size):

'measure_option': autotvm.measure_option(
builder=autotvm.LocalBuilder(),
runner=autotvm.LocalRunner(number=10, repeat=1,
min_repeat_ms=1000),
runner=autotvm.LocalRunner(number=1, repeat=10,
min_repeat_ms=0,
enable_cpu_cache_flush=True),
),
}

Expand Down

0 comments on commit ae4480a

Please sign in to comment.