"Number of streams per worker per GPU"
" to use in GPU thread pool (experimental)");
-C10_DECLARE_bool(caffe2_dag_net_collect_stats);
-
C10_DEFINE_bool(
caffe2_net_async_inference_mode,
false,
"If set, use one single chain containing all ops");
-C10_DEFINE_bool(
- caffe2_net_async_finish_chain,
- false,
- "Wait for chain to finish");
-
-C10_DEFINE_bool(
- caffe2_net_async_always_schedule_child,
- false,
- "Always schedule child chains from parent chain");
-
C10_DEFINE_int(
caffe2_net_async_max_gpus,
16,
false,
"Use per net thread pools");
+C10_DEFINE_bool(
+ caffe2_net_async_run_root_tasks_inline,
+ false,
+ "Run root tasks in current thread instread of scheduling to threadpool");
+
namespace caffe2 {
std::vector<int>& AsyncNetBase::getStreamCounters() {
AsyncNetBase::AsyncNetBase(
const std::shared_ptr<const NetDef>& net_def,
Workspace* ws)
- : NetBase(net_def, ws), counters_(net_def) {
- computeExecutionModeFlags();
-
+ : NetBase(net_def, ws), options_(net_def), counters_(net_def) {
operator_nodes_ = dag_utils::prepareOperatorNodes(net_def, ws);
helper_ = caffe2::make_unique<AsyncNetExecutorHelper>(this);
operators_.reserve(operator_nodes_.size());
const auto& last_op = operators_[chain.back()];
events_.push_back(&last_op->event());
// keep events for inner chain ops in case of profiling
- if (!report_stats_) {
+ if (!options_.report_stats_) {
for (const auto& op_id : chain) {
if (op_id == chain.back() || op_id == chain.front()) {
continue;
auto pool = pools[device_id][pool_size];
if (!pool) {
pool = ThreadPoolRegistry()->Create(
- DeviceTypeName(device_type), device_id, pool_size, use_per_net_pools_);
+ DeviceTypeName(device_type),
+ device_id,
+ pool_size,
+ options_.use_per_net_pools_);
pools[device_id][pool_size] = pool;
}
return pool.get();
}
TaskThreadPoolBase* AsyncNetBase::pool(const DeviceOption& device_option) {
- if (use_single_pool_) {
+ if (options_.use_single_pool_) {
return poolGetter(cpu_pools_, PROTO_CPU, -1, num_workers_);
}
const auto device_type = device_option.device_type();
"Invalid GPU id: " + caffe2::to_string(gpu_id));
return poolGetter(gpu_pools_, device_type, gpu_id, num_workers_);
} else {
- CAFFE_THROW(
- "Unsupported device type " +
- caffe2::to_string(device_type));
+ CAFFE_THROW("Unsupported device type " + caffe2::to_string(device_type));
}
}
}
do {
stream_id = getStreamCounters().at(gpu_id)++;
- getStreamCounters().at(gpu_id) %= streams_per_gpu_;
- } while (check_stream_status_ && !isStreamFree(task_id, stream_id));
+ getStreamCounters().at(gpu_id) %= options_.streams_per_gpu_;
+ } while (options_.check_stream_status_ &&
+ !isStreamFree(task_id, stream_id));
}
return stream_id;
}
OperatorBase* op = nullptr;
try {
// Optionally insert async wait ops,
- // skip when using --caffe2_net_async_finish_chain -
+ // skip when finish_chain_ is set -
// all parents are guaranteed to be finished
- if (!finish_chain_) {
+ if (!options_.finish_chain_) {
asyncWait(task_id, stream_id, parents(task_id));
}
for (auto& op_id : chains_[task_id]) {
op = operators_[op_id];
bool success = false;
- if (!report_stats_) {
+ if (!options_.report_stats_) {
TRACE_EVENT(
tracing::TRACE_OP,
op_id,
}
op = nullptr;
- if (finish_chain_) {
+ if (options_.finish_chain_) {
operators_[chains_[task_id].back()]->event().Finish();
}
} catch (const std::exception& e) {
}
AsyncNetBase::~AsyncNetBase() {
- if (report_stats_) {
+ if (options_.report_stats_) {
counters_.PrintStats();
}
}
HIP,
GetAsyncNetThreadPool<TaskThreadPool, PROTO_HIP>);
-void AsyncNetBase::computeExecutionModeFlags() {
+ExecutionOptions::ExecutionOptions(
+ const std::shared_ptr<const NetDef>& net_def) {
static const std::string kDag = "dag";
static const std::string kProfDag = "prof_dag";
static const std::string kAsyncDag = "async_dag";
static const std::string kSimpleNet = "simple";
std::string net_type;
- if (net_def_->has_type() && !net_def_->type().empty()) {
- net_type = net_def_->type();
+ if (net_def->has_type() && !net_def->type().empty()) {
+ net_type = net_def->type();
} else {
net_type = kSimpleNet;
}
report_stats_ = false;
} else {
streams_per_gpu_ = FLAGS_caffe2_streams_per_gpu;
- finish_chain_ = FLAGS_caffe2_net_async_finish_chain;
- always_schedule_child_ = FLAGS_caffe2_net_async_always_schedule_child;
+ finish_chain_ = false;
+ always_schedule_child_ = false;
check_stream_status_ = FLAGS_caffe2_net_async_check_stream_status;
use_single_pool_ = FLAGS_caffe2_net_async_use_single_pool;
use_per_net_pools_ = FLAGS_caffe2_net_async_use_per_net_pools;
report_stats_ = false;
}
- for (int arg_idx = 0; arg_idx < net_def_->arg_size(); ++arg_idx) {
- auto& arg = net_def_->arg(arg_idx);
+ use_dfs_scheduling_ = false;
+
+ for (int arg_idx = 0; arg_idx < net_def->arg_size(); ++arg_idx) {
+ auto& arg = net_def->arg(arg_idx);
if (arg.has_name() && arg.name() == "enable_profiling") {
CAFFE_ENFORCE(arg.has_i(), "enable_profiling should be an int");
report_stats_ = arg.i() == 1;
- break;
+ }
+ if (arg.has_name() && arg.name() == "deferrable_mode") {
+ CAFFE_ENFORCE(arg.has_i(), "deferrable_mode should be an int");
+ use_dfs_scheduling_ = arg.i() == 1; // corr. to DFS scheduling
}
}
+
+ run_root_tasks_inline_ = FLAGS_caffe2_net_async_run_root_tasks_inline;
}
} // namespace caffe2
#include "caffe2/utils/thread_pool.h"
C10_DECLARE_int(caffe2_streams_per_gpu);
-C10_DECLARE_bool(caffe2_net_async_finish_chain);
-C10_DECLARE_bool(caffe2_net_async_always_schedule_child);
C10_DECLARE_int(caffe2_net_async_max_gpus);
C10_DECLARE_int(caffe2_net_async_max_numa_nodes);
C10_DECLARE_int(caffe2_net_async_thread_pool_size);
C10_DECLARE_bool(caffe2_net_async_check_stream_status);
C10_DECLARE_bool(caffe2_net_async_use_single_pool);
C10_DECLARE_bool(caffe2_net_async_use_per_net_pools);
+C10_DECLARE_bool(caffe2_net_async_run_root_tasks_inline);
namespace caffe2 {
class Tracer;
}
+struct ExecutionOptions {
+ explicit ExecutionOptions(const std::shared_ptr<const NetDef>& net_def);
+
+ int streams_per_gpu_ = 1;
+ bool finish_chain_ = false;
+ bool always_schedule_child_ = false;
+ bool check_stream_status_ = false;
+ bool use_single_pool_ = false;
+ bool use_per_net_pools_ = false;
+ bool is_blocking_ = false;
+ bool report_stats_ = false;
+ bool use_dfs_scheduling_ = false;
+ bool run_root_tasks_inline_ = false;
+};
+
class CAFFE2_API AsyncNetBase : public NetBase {
public:
AsyncNetBase(const std::shared_ptr<const NetDef>& net_def, Workspace* ws);
std::shared_ptr<tracing::Tracer> tracer_;
// execution mode flags
- void computeExecutionModeFlags();
- int streams_per_gpu_;
- bool finish_chain_;
- bool always_schedule_child_;
- bool check_stream_status_;
- bool use_single_pool_;
- bool use_per_net_pools_;
- bool is_blocking_;
- bool report_stats_;
+ ExecutionOptions options_;
ProfDAGCounters counters_;
C10_DECLARE_bool(caffe2_dag_net_collect_stats);
-C10_DECLARE_bool(caffe2_net_async_finish_chain);
-
C10_DECLARE_int(caffe2_streams_per_gpu);
C10_DECLARE_bool(caffe2_net_async_check_stream_status);
}
const auto& sink_idx = chain.back();
- if (success && FLAGS_caffe2_net_async_finish_chain) {
- operator_nodes_[sink_idx].operator_->event().Finish();
- }
CAFFE_ENFORCE(
!eventRecorded_[sink_idx],
"An event for ",
#include "caffe2/core/net_async_tracing.h"
-C10_DEFINE_bool(
- caffe2_net_async_optimize_polling,
- true,
- "Use event callbacks whenever possible instead of polling");
-C10_DEFINE_bool(
- caffe2_net_async_run_root_tasks_inline,
- false,
- "Run root tasks in current thread instread of scheduling to threadpool");
-
namespace caffe2 {
AsyncSchedulingNet::AsyncSchedulingNet(
const std::shared_ptr<const NetDef>& net_def,
Workspace* ws)
- : AsyncNetBase(net_def, ws), running_(false), use_dfs_scheduling_(false) {
- for (int arg_idx = 0; arg_idx < net_def->arg_size(); ++arg_idx) {
- auto& arg = net_def->arg(arg_idx);
- if (arg.has_name() && arg.name() == "deferrable_mode") {
- CAFFE_ENFORCE(arg.has_i(), "deferrable_mode should be an int");
- use_dfs_scheduling_ = arg.i() == 1; // corr. to DFS scheduling
- break;
- }
- }
-}
+ : AsyncNetBase(net_def, ws), running_(false) {}
void AsyncSchedulingNet::reset() {
AsyncNetBase::reset();
}
bool AsyncSchedulingNet::isInlineTask(int parent_id, int child_id) const {
- if (!use_dfs_scheduling_) {
+ if (!options_.use_dfs_scheduling_) {
return false;
}
const auto* last_parent_op = lastTaskOp(parent_id);
auto schedule_func = [this, task_id]() {
if (success_) {
int stream_id = 0;
- if (streams_per_gpu_ > 1) {
+ if (options_.streams_per_gpu_ > 1) {
stream_id = stream(task_id);
}
if (!run(task_id, stream_id)) {
}
}
- if (report_stats_) {
+ if (options_.report_stats_) {
auto last_op_id = lastTaskOpId(task_id);
auto* last_op = lastTaskOp(task_id);
if (last_op->device_option().device_type() == PROTO_CPU &&
if (parent_count == 0) {
// Schedule a child if:
// - there is failure, we skip an op execution and finish the job
- // - forced scheduling though --caffe2_net_async_always_schedule_child
- // - --caffe2_net_async_finish_chain is set, in this case parents are
+ // - forced scheduling though always_schedule_child_
+ // - finish_chain_ is set, in this case parents are
// guaranteed to be finished
// - in all other cases, check parents with canSchedule
- if (!success_ || always_schedule_child_ || finish_chain_ ||
- canSchedule(child_id)) {
+ if (!success_ || options_.always_schedule_child_ ||
+ options_.finish_chain_ || canSchedule(child_id)) {
// if DFS scheduling is enabled, run children inline,
// ignore DFS scheduling in callbacks
schedule(child_id, isInlineTask(task_id, child_id));
if (!canSchedule(parent_id, child_id)) {
// we can't schedule a child because of this parent,
// check if parent supports callback
- if (FLAGS_caffe2_net_async_optimize_polling &&
- parent_event.SupportsCallback()) {
+ if (parent_event.SupportsCallback()) {
parents_with_callback.push_back(parent_id);
} else {
parent_needs_polling = true;
std::unique_lock<std::mutex> lock(running_mutex_);
// wait for scheduled ops and make sure all events are marked as finished
finalizeEvents();
- if (report_stats_) {
+ if (options_.report_stats_) {
counters_.ReportRunEnd();
}
// notify observers and waiters
StartAllObservers();
tracing::startIter(tracer_);
- if (report_stats_) {
+ if (options_.report_stats_) {
counters_.ReportRunStart();
}
}
for (auto task_id = 0; task_id < tasksNum(); ++task_id) {
if (parents(task_id).empty()) {
- schedule(task_id, FLAGS_caffe2_net_async_run_root_tasks_inline);
+ schedule(task_id, options_.run_root_tasks_inline_);
}
}
} catch (const std::exception& e) {
finishRun();
}
- if (is_blocking_) {
+ if (options_.is_blocking_) {
Wait();
}
std::mutex running_mutex_;
std::condition_variable running_cv_;
std::atomic<bool> running_;
- bool use_dfs_scheduling_;
std::atomic<int> processed_tasks_num_;