From: Duc Ngo Date: Wed, 6 Mar 2019 18:31:00 +0000 (-0800) Subject: Remove nomscheduler (#17693) X-Git-Tag: accepted/tizen/6.5/unified/20211028.231830~975 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=e9eb18a18c3e67a2fa72283d3dd66969188aa85b;p=platform%2Fupstream%2Fpytorch.git Remove nomscheduler (#17693) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/17693 Remove nomscheduler tool Reviewed By: yinghai Differential Revision: D14328168 fbshipit-source-id: 674d0e18596a4dc2bbb6b8d321f4066c4fc454ab --- diff --git a/caffe2/CMakeLists.txt b/caffe2/CMakeLists.txt index 6cd02d4..31b842b 100644 --- a/caffe2/CMakeLists.txt +++ b/caffe2/CMakeLists.txt @@ -70,7 +70,6 @@ if(NOT BUILD_ATEN_ONLY) add_subdirectory(utils) add_subdirectory(predictor) add_subdirectory(core/nomnigraph) - add_subdirectory(core/nomscheduler) add_subdirectory(serialize) if (USE_NVRTC) add_subdirectory(cuda_rtc) diff --git a/caffe2/core/nomscheduler/CMakeLists.txt b/caffe2/core/nomscheduler/CMakeLists.txt deleted file mode 100644 index dcde54c..0000000 --- a/caffe2/core/nomscheduler/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# ---[ CPU files. -file(GLOB_RECURSE NOMSCHEDULER_SRCS *.cc) -file(GLOB_RECURSE NOMSCHEDULER_TEST_SRCS *Test.cc) -exclude(NOMSCHEDULER_SRCS "${NOMSCHEDULER_SRCS}" "${NOMSCHEDULER_TEST_SRCS}") - -install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include - DESTINATION include - FILES_MATCHING PATTERN "*.h") diff --git a/caffe2/core/nomscheduler/README.md b/caffe2/core/nomscheduler/README.md deleted file mode 100644 index 855f4f0..0000000 --- a/caffe2/core/nomscheduler/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# nomscheduler - -nomscheduler is a package built on top of nomnigraph that provides support for task scheduling algorithms diff --git a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/CriticalPathAnalyzer.cc b/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/CriticalPathAnalyzer.cc deleted file mode 100644 index 6cec145..0000000 --- a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/CriticalPathAnalyzer.cc +++ /dev/null @@ -1,154 +0,0 @@ -#include "CriticalPathAnalyzer.h" -#include - -namespace { - -// Data structure used by the dynamic programming algorithm. -struct PathTrace { - float cost; - bool computed = false; - // Successive (taskId, deviceId) on the critical path. - int depTaskId = -1; - int depDeviceId = -1; -}; - -// List of tasks and device assignment along a path. -struct FullPathTrace { - std::vector taskIds; - std::vector deviceIds; -}; - -struct State { - explicit State(const nomscheduler::SchedulerInput& input) { - int nTasks = input.getNumberOfTasks(); - int nDevices = input.getNumberOfDevices(); - pathTrace.resize(nTasks); - for (int taskId = 0; taskId < nTasks; taskId++) { - pathTrace.at(taskId).resize(nDevices); - } - } - - // Use dynamic programming to compute the theorical critical path. - // pathTrace[T][D] = theoretical critical path starting from task T, - // given that task T is executed on device D. - // Dynamic programming can be used because the critical path has optimize - // substructure (on a DAG), and can be expressed recursively as follows. - // pathTrace[T][D] = - // argmax(T', compCost(T, D) + - // argmin(D', commCost(T, D, T', D') + pathTrace[T'][D']) - std::vector> pathTrace; -}; - -void computePathTrace( - const nomscheduler::SchedulerInput& input, - State& state, - int taskId, - int deviceId) { - auto& trace = state.pathTrace.at(taskId).at(deviceId); - if (trace.computed) { - return; - } - - int nDevices = input.getNumberOfDevices(); - float computationCost = - input.getTaskDeviceCostModel(taskId, deviceId).getComputationCost(); - float maxCost = computationCost; - - // Recursively compute (with memoization) critical path trace on all possible - // (depTaskId, depDeviceId) combinations where depTaskId is a dependent of - // taskId. - for (auto outEdge : input.getTaskNode(taskId)->getOutEdges()) { - auto depTaskId = outEdge->head()->data().getId(); - - float dataSize = outEdge->data().getDataSize(); - - int bestCaseDeviceId = -1; - float bestCaseCost = 0; - - for (int depDeviceId = 0; depDeviceId < nDevices; depDeviceId++) { - float commCost = dataSize * - input.getDeviceEdge(deviceId, depDeviceId).getDataTransferRate(); - - // Make sure that path trace is computed on (depTaskId, depDeviceId) - computePathTrace(input, state, depTaskId, depDeviceId); - - auto& depTrace = state.pathTrace.at(depTaskId).at(depDeviceId); - - float totalCost = computationCost + commCost + depTrace.cost; - - if (bestCaseDeviceId == -1 || totalCost < bestCaseCost) { - // Given depTaskId, choose the device assignment that minimizes the - // total computation cost. - bestCaseDeviceId = depDeviceId; - bestCaseCost = totalCost; - } - } - - if (bestCaseCost > maxCost) { - maxCost = bestCaseCost; - trace.depTaskId = depTaskId; - trace.depDeviceId = bestCaseDeviceId; - } - } - - trace.computed = true; - trace.cost = maxCost; -} - -FullPathTrace -constructFullPathTrace(const State& state, int taskId, int deviceId) { - FullPathTrace output; - int t = taskId; - int d = deviceId; - while (t != -1) { - output.taskIds.emplace_back(t); - output.deviceIds.emplace_back(d); - auto& trace = state.pathTrace.at(t).at(d); - t = trace.depTaskId; - d = trace.depDeviceId; - } - return output; -} - -} // namespace - -namespace nomscheduler { - -CriticalPathOutput CriticalPathAnalyzer::analyze(const SchedulerInput& input) { - CriticalPathOutput output; - - auto state = State(input); - - float maxCost = 0; - int nTasks = input.getNumberOfTasks(); - int nDevices = input.getNumberOfDevices(); - - int criticalPathTaskId = -1; - int criticalPathDeviceId = -1; - - for (int taskId = 0; taskId < nTasks; taskId++) { - float bestCaseDeviceId = -1; - float bestCaseCost = 0; - for (int deviceId = 0; deviceId < nDevices; deviceId++) { - computePathTrace(input, state, taskId, deviceId); - auto& trace = state.pathTrace.at(taskId).at(deviceId); - if (bestCaseDeviceId == -1 || trace.cost < bestCaseCost) { - bestCaseCost = trace.cost; - bestCaseDeviceId = deviceId; - } - } - if (bestCaseCost > maxCost) { - maxCost = bestCaseCost; - criticalPathTaskId = taskId; - criticalPathDeviceId = bestCaseDeviceId; - } - } - - auto fullPathTrace = - constructFullPathTrace(state, criticalPathTaskId, criticalPathDeviceId); - output.setOutput(maxCost, fullPathTrace.taskIds, fullPathTrace.deviceIds); - - return output; -} - -} // namespace nomscheduler diff --git a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/CriticalPathAnalyzer.h b/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/CriticalPathAnalyzer.h deleted file mode 100644 index 23e7f58..0000000 --- a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/CriticalPathAnalyzer.h +++ /dev/null @@ -1,68 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// Tool to analyze (theoretical) critical path of a task scheduling problems. -// -//===----------------------------------------------------------------------===// - -#ifndef NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H -#define NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H - -#include - -#include "Scheduler.h" - -namespace nomscheduler { - -class CriticalPathOutput { - public: - float getTotalCost() const { - return totalCost_; - } - - std::vector getTaskIds() const { - return taskIds_; - } - - std::vector getDeviceIds() const { - return deviceIds_; - } - - void setOutput( - float totalCost, - const std::vector& taskIds, - const std::vector& deviceIds) { - totalCost_ = totalCost; - taskIds_ = taskIds; - deviceIds_ = deviceIds; - } - - private: - float totalCost_; - - // Task along the critical path. - std::vector taskIds_; - // Device assignment of the tasks along the critical path. - std::vector deviceIds_; -}; - -class CriticalPathAnalyzer { - public: - // Analyze the theoretical critical path(s) of an input scheduling problem. - // Communication cost should be taken into account. - // Formal definition - // The best-scenario computation cost of a path - // Task1 -> ... -> TaskK - // in the task dependency graph - // is defined by chossing a device assignment - // (Device1, ... , DeviceK) - // that minimizes the total computation cost (communication cost is taken - // into account). - // - // The critical path is defined as the path that has the maximum - // best-scenario computation cost. - CriticalPathOutput analyze(const SchedulerInput& input); -}; - -} // namespace nomscheduler - -#endif // NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H diff --git a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler-Internal.h b/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler-Internal.h deleted file mode 100644 index 2abda65..0000000 --- a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler-Internal.h +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H -#define NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H - -#include "Scheduler.h" - -namespace heftscheduler { - -// Internal state associated with a task while the algorithm is running. -struct TaskState { - // Average computation cost across all devices. - float avgComputationCost; - - // The upward rank of a task T is defined recursively as: - // upwardRank(T) = - // avgComputationCost(T) + max(avgCommCost(T, T') + upwardRank(T')) - // Basically, upwardRank(T) is the length of the critical path from - // task T to the exit task, including the computation cost of task T. - float upwardRank; - bool upwardRankComputed = false; -}; - -// Represents a slot of time to schedule tasks on a device. -// Additionally store the number of used cores, for intra-op parallelism. -struct CoreSlot { - float startTime, endTime; - int usedCores; -}; - -// Internal state associated with a device while the algorithm is running. -struct DeviceState { - // Maintain a list of slots per device. - // The slots are guaranteed to be continuous. - std::vector slots; -}; - -// Internal state of the HEFT scheduling algorithm. -struct AlgorithmState { - explicit AlgorithmState(const nomscheduler::SchedulerInput& input) { - int nTasks = input.getNumberOfTasks(); - - tasksState.resize(nTasks); - - // Initial, unsorted values. - taskIdsByUpwardRank.resize(nTasks); - for (int taskId = 0; taskId < nTasks; taskId++) { - taskIdsByUpwardRank[taskId] = taskId; - } - - int nDevices = input.getNumberOfDevices(); - devicesState.resize(nDevices); - for (int deviceId = 0; deviceId < nDevices; deviceId++) { - CoreSlot all; - all.startTime = 0; - all.endTime = std::numeric_limits::infinity(); - all.usedCores = 0; - // Initially, there is no task scheduled on each device, so we just make - // one slot that covers the entire time horizon. - devicesState.at(deviceId).slots.emplace_back(all); - } - } - - std::vector tasksState; - - std::vector devicesState; - - // Task ids sorted by upward ranks in decreasing order. - // It can be shown that this is also a topological order. - std::vector taskIdsByUpwardRank; - - // Average data transfer rate between two devices. - float avgDataTransferRate; -}; - -} // namespace heftscheduler - -#endif // NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H diff --git a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler.cc b/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler.cc deleted file mode 100644 index 1849fb9..0000000 --- a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler.cc +++ /dev/null @@ -1,317 +0,0 @@ -#include "HEFTScheduler.h" -#include - -namespace { - -void computeAverageComputationCost( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state, - int taskId) { - float sum = 0; - for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) { - sum += input.getTaskDeviceCostModel(taskId, deviceId).getComputationCost(); - } - state.tasksState.at(taskId).avgComputationCost = - sum / input.getNumberOfDevices(); -} - -void computeAverageComputationCost( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state) { - for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) { - computeAverageComputationCost(input, state, taskId); - } -} - -void computeAverageDataTransferRate( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state) { - state.avgDataTransferRate = 0; - - int nDevices = input.getNumberOfDevices(); - for (int deviceId1 = 0; deviceId1 < nDevices; deviceId1++) { - for (int deviceId2 = 0; deviceId2 < nDevices; deviceId2++) { - if (deviceId1 != deviceId2) { - state.avgDataTransferRate += - input.getDeviceEdge(deviceId1, deviceId2).getDataTransferRate(); - } - } - } - - if (nDevices > 1) { - state.avgDataTransferRate /= nDevices * (nDevices - 1); - } -} - -void computeUpwardRank( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state, - int taskId) { - auto& taskState = state.tasksState.at(taskId); - if (taskState.upwardRankComputed) { - return; - } - - float maxDependentCost = 0; - for (auto outEdge : input.getTaskNode(taskId)->getOutEdges()) { - auto dependentTask = outEdge->head(); - auto dependentTaskId = dependentTask->data().getId(); - computeUpwardRank(input, state, dependentTaskId); - - float avgCommCost = - input.getTaskEdge(taskId, dependentTaskId).getDataSize() * - state.avgDataTransferRate; - - maxDependentCost = std::max( - maxDependentCost, - avgCommCost + state.tasksState.at(dependentTaskId).upwardRank); - } - - taskState.upwardRankComputed = true; - taskState.upwardRank = taskState.avgComputationCost + maxDependentCost; -} - -void computeUpwardRank( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state) { - for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) { - computeUpwardRank(input, state, taskId); - } -} - -void sortTasksByUpwardRank(heftscheduler::AlgorithmState& state) { - std::sort( - state.taskIdsByUpwardRank.begin(), - state.taskIdsByUpwardRank.end(), - [&state](int taskId1, int taskId2) -> bool { - return state.tasksState[taskId1].upwardRank > - state.tasksState[taskId2].upwardRank; - }); -} - -// Task assignment information on a specific device. -struct TaskAssignment { - bool possible; - float start, end; -}; - -// Compute the earliest possible start time of a task on a device based -// on dependency graph information and current schedule. -float computeEarliestPossibleStartTimeFromDAG( - const nomscheduler::SchedulerInput& input, - const heftscheduler::AlgorithmState& state, - const nomscheduler::SchedulerOutput& output, - int taskId, - int deviceId) { - float result = 0.0f; - for (auto& inEdge : input.getTaskNode(taskId)->getInEdges()) { - auto prereqTaskId = inEdge->tail()->data().getId(); - auto& prereqScheduleItem = output.getTaskScheduleItem(prereqTaskId); - // Since the algorithm schedule tasks in topological order, at this point - // all the prerequisites should have been scheduled. - assert(prereqScheduleItem.isAssigned()); - - // Communication time to send output from the prerequisite task to the - // current task. - float commTime = inEdge->data().getDataSize() * - input.getDeviceEdge(prereqScheduleItem.getAssignedDeviceId(), deviceId) - .getDataTransferRate(); - result = std::max(result, prereqScheduleItem.getEndTime() + commTime); - } - return result; -} - -void computeEarliestTaskAssignment( - const nomscheduler::SchedulerInput& input, - const heftscheduler::AlgorithmState& state, - const nomscheduler::SchedulerOutput& output, - int taskId, - int deviceId, - TaskAssignment& assignment) { - assignment.possible = false; - auto& costModel = input.getTaskDeviceCostModel(taskId, deviceId); - if (!costModel.isPossible()) { - // If the task cannot be scheduled on the device. - return; - } - float earliestPossibleStartTime = computeEarliestPossibleStartTimeFromDAG( - input, state, output, taskId, deviceId); - - int coresUsedByTask = input.getTask(taskId).getIntraDeviceParallelism(); - int coresInDevice = input.getDevice(deviceId).getNumberOfCores(); - - auto& slots = state.devicesState.at(deviceId).slots; - for (int slotId = 0; slotId < slots.size(); slotId++) { - auto& slot = slots.at(slotId); - if (earliestPossibleStartTime > slot.endTime) { - // Ignore slots that end before the earliest possible start time. - continue; - } - float requiredStart = std::max(earliestPossibleStartTime, slot.startTime); - float requiredEnd = requiredStart + costModel.getComputationCost(); - - // Find a range of slots that can accommodate the task. - bool found = false; - for (int endSlotId = slotId; endSlotId < slots.size(); endSlotId++) { - auto& endSlot = slots[endSlotId]; - if (endSlot.usedCores + coresUsedByTask > coresInDevice) { - // Not enough cores to execute the task. - break; - } - if (requiredEnd <= endSlot.endTime) { - // We found a range of slots that covers the time window to execute - // the task. - found = true; - break; - } - } - if (found) { - assignment.possible = true; - assignment.start = requiredStart; - assignment.end = requiredEnd; - break; - } - } -} - -// Update the list of slots for a device, given the schedule of a new task -// on that device. -void updateSlots( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state, - int taskId, - int deviceId, - const TaskAssignment& assignment) { - auto& slots = state.devicesState.at(deviceId).slots; - - // Find start slot of the slot ranges that cover the task assignment time - // window. - int startSlotId = 0; - while (startSlotId < slots.size() && - assignment.start >= slots.at(startSlotId).endTime) { - startSlotId++; - } - // Start slot must exist. - assert( - startSlotId < slots.size() && - assignment.start < slots.at(startSlotId).endTime); - - if (assignment.start > slots.at(startSlotId).startTime) { - auto& startSlot = slots.at(startSlotId); - // Split the start slot into two. - heftscheduler::CoreSlot newSlot; - newSlot.startTime = startSlot.startTime; - newSlot.endTime = assignment.start; - newSlot.usedCores = startSlot.usedCores; - - startSlot.startTime = assignment.start; - slots.insert(slots.begin() + startSlotId, newSlot); - startSlotId++; - } - - // Find end slot of the slot ranges that cover the task assignment time - // window. - int endSlotId = startSlotId; - while (endSlotId < slots.size() && - assignment.end > slots.at(endSlotId).endTime) { - endSlotId++; - } - // End slot must exist. - assert( - endSlotId < slots.size() && - assignment.end <= slots.at(endSlotId).endTime); - - if (assignment.end < slots.at(endSlotId).endTime) { - // Split the end slot into two. - auto& endSlot = slots.at(endSlotId); - heftscheduler::CoreSlot newSlot; - newSlot.startTime = endSlot.startTime; - newSlot.endTime = assignment.end; - newSlot.usedCores = endSlot.usedCores; - - endSlot.startTime = assignment.end; - slots.insert(slots.begin() + endSlotId, newSlot); - } - - // Now we just update the usedCores count of the slots. - int coresUsedByTask = input.getTask(taskId).getIntraDeviceParallelism(); - for (int slotId = startSlotId; slotId <= endSlotId; slotId++) { - slots.at(slotId).usedCores += coresUsedByTask; - } -} - -void scheduleTask( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state, - nomscheduler::SchedulerOutput& output, - int taskId) { - // For each device, calculate the earliest possible assignment of the task - // to the device (or if the task can even be assigned to the device at all). - std::vector assignments(input.getNumberOfDevices()); - for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) { - computeEarliestTaskAssignment( - input, state, output, taskId, deviceId, assignments.at(deviceId)); - } - - // Select the device that minimize the earlist finish time of the task. - int scheduledDeviceId = -1; - for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) { - auto& assignment = assignments.at(deviceId); - if (assignment.possible && - ((scheduledDeviceId == -1 || - assignment.end < assignments.at(scheduledDeviceId).end))) { - scheduledDeviceId = deviceId; - } - } - - if (scheduledDeviceId == -1) { - // No device can execute the task. - output.setFailure(true); - } else { - auto& assignment = assignments.at(scheduledDeviceId); - auto& taskScheduleItem = output.getMutableTaskScheduleItem(taskId); - taskScheduleItem.setAssignedDeviceId(scheduledDeviceId); - taskScheduleItem.setStartTime(assignment.start); - taskScheduleItem.setEndTime(assignment.end); - updateSlots(input, state, taskId, scheduledDeviceId, assignment); - } -} - -void scheduleTasks( - const nomscheduler::SchedulerInput& input, - heftscheduler::AlgorithmState& state, - nomscheduler::SchedulerOutput& output) { - // Loop over tasks in decreasing order of upward ranks (which is also - // a topological order) and schedule each one. - for (int taskId : state.taskIdsByUpwardRank) { - scheduleTask(input, state, output, taskId); - if (output.isFailure()) { - break; - } - } -} - -} // namespace - -namespace nomscheduler { - -std::pair -HEFTScheduler::scheduleInternal(const SchedulerInput& input) { - heftscheduler::AlgorithmState state(input); - - computeAverageComputationCost(input, state); - computeAverageDataTransferRate(input, state); - computeUpwardRank(input, state); - sortTasksByUpwardRank(state); - - SchedulerOutput output(input.getNumberOfTasks()); - output.setFailure(false); - scheduleTasks(input, state, output); - return std::make_pair(output, state); -} - -SchedulerOutput HEFTScheduler::schedule(const SchedulerInput& input) { - return scheduleInternal(input).first; -} - -} // namespace nomscheduler diff --git a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler.h b/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler.h deleted file mode 100644 index a18e6d9..0000000 --- a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/HEFTScheduler.h +++ /dev/null @@ -1,35 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// nomnigraph supports for task scheduling problems. -// HEFT-based implementation of scheduler. -// -// (Heterogeneous Earliest Finish Time) -// Original description: -// Performance-effective and low-complexity task scheduling -// for heterogeneous computing -// H. Topcuoglu; S. Hariri; Min-You Wu -// IEEE Transactions on Parallel and Distributed Systems 2002 -// -//===----------------------------------------------------------------------===// - -#ifndef NOM_SCHEDULER_HEFT_SCHEDULER_H -#define NOM_SCHEDULER_HEFT_SCHEDULER_H - -#include "HEFTScheduler-Internal.h" -#include "Scheduler.h" - -namespace nomscheduler { - -class HEFTScheduler : Scheduler { - public: - virtual SchedulerOutput schedule(const SchedulerInput& input) override; - - // Expose a scheduling method that also returns an internal algorithm state - // for unit testing purpose. - std::pair scheduleInternal( - const SchedulerInput& input); -}; - -} // namespace nomscheduler - -#endif // NOM_SCHEDULER_HEFT_SCHEDULER_H diff --git a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/Scheduler.h b/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/Scheduler.h deleted file mode 100644 index 8c1d07a..0000000 --- a/caffe2/core/nomscheduler/include/nomscheduler/Scheduler/Scheduler.h +++ /dev/null @@ -1,350 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// nomnigraph supports for task scheduling problems. -// -//===----------------------------------------------------------------------===// - -#ifndef NOM_SCHEDULER_SCHEDULER_H -#define NOM_SCHEDULER_SCHEDULER_H - -#include "caffe2/core/common.h" -#include "nomnigraph/Graph/Graph.h" - -#include -#include - -namespace nomscheduler { - -// Models a processing unit (such as CPU/GPU/accelerator/...) that can execute a -// task. -class Device { - public: - Device() {} - - int getNumberOfCores() const { - return numberOfCores_; - } - - void setNumberOfCores(int numberOfCores) { - numberOfCores_ = numberOfCores; - } - - // unit: GB - float getMaxMemory() const { - return maxMemory_; - } - - void setMaxMemory(float maxMemory) { - maxMemory_ = maxMemory; - } - - private: - int numberOfCores_; - float maxMemory_; -}; - -// Models a link between two devices. -class DeviceEdge { - public: - DeviceEdge() {} - - // data transfer rate between two devices (unit: s / bytes) - float getDataTransferRate() const { - return dataTransferRate_; - } - - void setDataTransferRate(float dataTransferRate) { - dataTransferRate_ = dataTransferRate; - } - - private: - float dataTransferRate_; -}; - -// Models a unit of work that can be scheduled to run on a device. -class Task { - public: - Task(int taskId) : taskId_(taskId) {} - - // number of cores that will be used by the task - int getIntraDeviceParallelism() const { - return intraDeviceParallelism_; - } - - void setIntraDeviceParallelism(int intraDeviceParallelism) { - intraDeviceParallelism_ = intraDeviceParallelism; - } - - // static memory consumed by the task, unit: GB - float getStaticMemoryConsumed() const { - return staticMemoryConsumed_; - } - - void setStaticMemoryConsumed(float staticMemoryConsumed) { - staticMemoryConsumed_ = staticMemoryConsumed; - } - - int getId() const { - return taskId_; - } - - private: - int intraDeviceParallelism_; - float staticMemoryConsumed_; - int taskId_; -}; - -// Model a dependency between two tasks. An edge between Task A -> Task B -// means that Task B depends on the output of Task A, and so Task B must start -// after task A finishes. -// The edge A->B also holds the size of the data that needs to be transferred -// from task A to task B, i.e. the total size of the blobs produced by A -// and consumed by B. -class TaskEdge { - public: - // size of data transfered between two tasks (unit : bytes) - float getDataSize() const { - return dataSize_; - } - - void setDataSize(float dataSize) { - dataSize_ = dataSize; - } - - private: - float dataSize_; -}; - -// Represents the cost model of a task executed on a specific device. -class TaskDeviceEdge { - public: - // estimated computation cost for a task executed by a device - // (runtime, unit: ms) - float getComputationCost() const { - return computationCost_; - } - - void setComputationCost(float computationCost) { - computationCost_ = computationCost; - } - - // Return true if the task can be executed by the device. - bool isPossible() const { - return possible_; - } - - void setPossible(bool possible) { - possible_ = possible; - } - - private: - float computationCost_; - bool possible_ = true; -}; - -// dependency DAG between tasks -using TaskGraph = nom::Graph; - -// (undirected) graph between devices, to represent communication links -// between devices -using DeviceGraph = nom::Graph; - -// (bipartite) task - device graph, represents estimated cost model for -// task execution on each device -// We don't currently store data on this graph's node, so int type is just a -// placeholder. -using TaskDeviceCostModelGraph = nom::Graph; - -// Input to the scheduler. Underneath, the input is represented by one -// TaskGraph, one DeviceGraph and one TaskDeviceCostModelGraph. -class SchedulerInput { - public: - SchedulerInput(int numTasks, int numDevices) { - for (int taskId = 0; taskId < numTasks; taskId++) { - tasks_.emplace_back(taskGraph_.createNode(taskId)); - taskNodes_.emplace_back(costModelGraph_.createNode()); - } - - for (int deviceId = 0; deviceId < numDevices; deviceId++) { - devices_.emplace_back(deviceGraph_.createNode()); - deviceNodes_.emplace_back(costModelGraph_.createNode()); - } - - for (int taskId = 0; taskId < numTasks; taskId++) { - for (int deviceId = 0; deviceId < numDevices; deviceId++) { - costModelGraph_.createEdge( - taskNodes_[taskId], deviceNodes_[deviceId], TaskDeviceEdge()); - } - } - - for (int deviceId1 = 0; deviceId1 < numDevices; deviceId1++) { - for (int deviceId2 = 0; deviceId2 < numDevices; deviceId2++) { - deviceGraph_.createEdge( - devices_[deviceId1], devices_[deviceId2], DeviceEdge()); - } - } - } - - int getNumberOfDevices() const { - return deviceGraph_.getNodesCount(); - } - - int getNumberOfTasks() const { - return taskGraph_.getNodesCount(); - } - - Device* getMutableDevice(int deviceId) { - return devices_[deviceId]->mutableData(); - } - - Task* getMutableTask(int taskId) { - return tasks_[taskId]->mutableData(); - } - - const Device& getDevice(int deviceId) const { - return devices_[deviceId]->data(); - } - - const Task& getTask(int taskId) const { - return tasks_[taskId]->data(); - } - - void createTaskDependency(int taskId1, int taskId2) { - taskGraph_.createEdge(tasks_[taskId1], tasks_[taskId2], TaskEdge()); - } - - TaskDeviceEdge* getMutableTaskDeviceCostModel(int taskId, int deviceId) { - return costModelGraph_.getEdge(taskNodes_[taskId], deviceNodes_[deviceId]) - ->mutableData(); - } - - const TaskDeviceEdge& getTaskDeviceCostModel(int taskId, int deviceId) const { - return costModelGraph_.getEdge(taskNodes_[taskId], deviceNodes_[deviceId]) - ->data(); - } - - DeviceEdge* getMutableDeviceEdge(int deviceId1, int deviceId2) { - return deviceGraph_.getEdge(devices_[deviceId1], devices_[deviceId2]) - ->mutableData(); - } - - const DeviceEdge& getDeviceEdge(int deviceId1, int deviceId2) const { - return deviceGraph_.getEdge(devices_[deviceId1], devices_[deviceId2]) - ->data(); - } - - TaskEdge* getMutableTaskEdge(int taskId1, int taskId2) { - return taskGraph_.getEdge(tasks_[taskId1], tasks_[taskId2])->mutableData(); - } - - const TaskEdge& getTaskEdge(int taskId1, int taskId2) const { - return taskGraph_.getEdge(tasks_[taskId1], tasks_[taskId2])->data(); - } - - TaskGraph::NodeRef getTaskNode(int taskId) const { - return tasks_[taskId]; - } - - private: - TaskGraph taskGraph_; - DeviceGraph deviceGraph_; - TaskDeviceCostModelGraph costModelGraph_; - - std::vector tasks_; - std::vector devices_; - - std::vector taskNodes_; - std::vector deviceNodes_; -}; - -// Represents a schedule item for a task. Consists of the device that the task -// should be assigned to, and the (estimated) start and end time of the task -// execution based on the cost model given to the scheduler. -class TaskScheduleItem { - public: - int getAssignedDeviceId() const { - return assignedDeviceId_; - } - - bool isAssigned() const { - return assignedDeviceId_ != -1; - } - - void setAssignedDeviceId(int assignedDeviceId) { - assignedDeviceId_ = assignedDeviceId; - } - - float getStartTime() const { - return startTime_; - } - - void setStartTime(float startTime) { - startTime_ = startTime; - } - - float getEndTime() const { - return endTime_; - } - - void setEndTime(float endTime) { - endTime_ = endTime; - } - - private: - int assignedDeviceId_ = -1; - float startTime_; - float endTime_; -}; - -// Represents an output of the static scheduler - a map from each task -// to a TaskScheduleItem for that task. -class SchedulerOutput { - public: - SchedulerOutput(int numTasks) { - for (int i = 0; i < numTasks; i++) { - taskScheduleItems_.emplace_back(); - } - } - - TaskScheduleItem& getMutableTaskScheduleItem(int taskId) { - return taskScheduleItems_[taskId]; - } - - const TaskScheduleItem& getTaskScheduleItem(int taskId) const { - return taskScheduleItems_[taskId]; - } - - // The finish time of the schedule, which is just the maximum end time - // of all the schedule items. - float getFinishTime() const { - float result = 0; - for (auto& scheduleItem : taskScheduleItems_) { - result = std::max(result, scheduleItem.getEndTime()); - } - return result; - } - - // Fails to compute a schedule. - bool isFailure() const { - return failure_; - } - - void setFailure(bool failure) { - failure_ = failure; - } - - private: - std::vector taskScheduleItems_; - bool failure_; -}; - -// Interface for static schedulers. -class Scheduler { - public: - virtual ~Scheduler() {} - virtual SchedulerOutput schedule(const SchedulerInput&) = 0; -}; - -} // namespace nomscheduler - -#endif // NOM_SCHEDULER_SCHEDULER_H diff --git a/caffe2/core/nomscheduler/tests/SchedulerTest.cc b/caffe2/core/nomscheduler/tests/SchedulerTest.cc deleted file mode 100644 index 179095d..0000000 --- a/caffe2/core/nomscheduler/tests/SchedulerTest.cc +++ /dev/null @@ -1,290 +0,0 @@ -#include -#include -#include -#include -#include - -#include "nomscheduler/Scheduler/CriticalPathAnalyzer.h" -#include "nomscheduler/Scheduler/HEFTScheduler.h" -#include "nomscheduler/Scheduler/Scheduler.h" - -#include - -namespace nomscheduler { - -SchedulerInput loadSchedulerInputFromString(const std::string& fileInput) { - std::stringstream ss; - ss << fileInput; - - int numTasks, numDevices; - ss >> numTasks >> numDevices; - - SchedulerInput result(numTasks, numDevices); - - // Cores per devices - for (int id = 0; id < numDevices; id++) { - int numCores; - ss >> numCores; - result.getMutableDevice(id)->setNumberOfCores(numCores); - } - - // Parallelism per task - for (int id = 0; id < numTasks; id++) { - int parallelismLevel; - ss >> parallelismLevel; - result.getMutableTask(id)->setIntraDeviceParallelism(parallelismLevel); - } - - // The computation costs of each task - for (int taskId = 0; taskId < numTasks; taskId++) { - for (int deviceId = 0; deviceId < numDevices; deviceId++) { - float cost; - ss >> cost; - if (cost < 0) { - result.getMutableTaskDeviceCostModel(taskId, deviceId) - ->setPossible(false); - } else { - result.getMutableTaskDeviceCostModel(taskId, deviceId) - ->setComputationCost(cost); - } - } - } - - for (int deviceId1 = 0; deviceId1 < numDevices; deviceId1++) { - for (int deviceId2 = 0; deviceId2 < numDevices; deviceId2++) { - float rate; - ss >> rate; - result.getMutableDeviceEdge(deviceId1, deviceId2) - ->setDataTransferRate(rate); - } - } - - for (int taskId1 = 0; taskId1 < numTasks; taskId1++) { - for (int taskId2 = 0; taskId2 < numTasks; taskId2++) { - float dataSize; - ss >> dataSize; - if (dataSize > 0) { - result.createTaskDependency(taskId1, taskId2); - result.getMutableTaskEdge(taskId1, taskId2)->setDataSize(dataSize); - } - } - } - - for (int deviceId = 0; deviceId < numDevices; deviceId++) { - float maxMemory; - ss >> maxMemory; - result.getMutableDevice(deviceId)->setMaxMemory(maxMemory); - } - - for (int id = 0; id < numTasks; id++) { - float staticMemoryConsumed; - ss >> staticMemoryConsumed; - result.getMutableTask(id)->setStaticMemoryConsumed(staticMemoryConsumed); - } - - return result; -} - -// A simple scheduling algorithm, just for testing and comparison purpose. -// For each iteration: -// - Pick any task that is ready to schedule (no dependency) -// - Then pick a device that has the earliest next available time to -// schedule that task. -// For simplicity, this algorithm does not take into account any resource -// constraints. -class SimpleScheduler : Scheduler { - public: - SchedulerOutput schedule(const SchedulerInput& input) override { - int numTasks = input.getNumberOfTasks(); - SchedulerOutput result(numTasks); - - std::unordered_set scheduledTasks; - - // Next available time per device. - std::vector nextFreeTime; - for (int i = 0; i < input.getNumberOfDevices(); i++) { - nextFreeTime.emplace_back(0); - } - - while (scheduledTasks.size() < numTasks) { - for (int taskId = 0; taskId < numTasks; taskId++) { - auto taskNode = input.getTaskNode(taskId); - if (scheduledTasks.count(taskNode)) { - continue; - } - - bool hasDependency = false; - for (auto& inEdge : taskNode->getInEdges()) { - auto tail = inEdge->tail(); - if (!scheduledTasks.count(tail)) { - hasDependency = true; - break; - } - } - - if (!hasDependency) { - scheduledTasks.insert(taskNode); - - // Find the device with earliest next available time. - int earliestDeviceId = 0; - for (int deviceId = 1; deviceId < input.getNumberOfDevices(); - deviceId++) { - if (nextFreeTime[deviceId] < nextFreeTime[earliestDeviceId]) { - earliestDeviceId = deviceId; - } - } - - // Schedule the task on the device. - auto& taskScheduleItem = result.getMutableTaskScheduleItem(taskId); - taskScheduleItem.setAssignedDeviceId(earliestDeviceId); - taskScheduleItem.setStartTime(nextFreeTime[earliestDeviceId]); - auto computationCost = - input.getTaskDeviceCostModel(taskId, earliestDeviceId) - .getComputationCost(); - taskScheduleItem.setEndTime( - taskScheduleItem.getStartTime() + computationCost); - - // Update next available time for the device. - nextFreeTime[earliestDeviceId] = taskScheduleItem.getEndTime(); - break; - } - } - } - - return result; - } -}; - -} // namespace nomscheduler - -nomscheduler::SchedulerInput getTestInput() { - return nomscheduler::loadSchedulerInputFromString(R"( - 10 3 - - 1 1 1 - 1 1 1 1 1 1 1 1 1 1 - - 14 16 9 - 13 19 18 - 11 13 19 - 13 8 17 - 12 13 10 - 13 16 9 - 7 15 11 - 5 11 14 - 18 12 20 - 21 7 16 - - 0 1 1 - 1 0 1 - 1 1 0 - - -1 18 12 9 11 14 -1 -1 -1 -1 - -1 -1 -1 -1 -1 -1 -1 19 16 -1 - -1 -1 -1 -1 -1 -1 23 -1 -1 -1 - -1 -1 -1 -1 -1 -1 -1 27 23 -1 - -1 -1 -1 -1 -1 -1 -1 -1 13 -1 - -1 -1 -1 -1 -1 -1 -1 15 -1 -1 - -1 -1 -1 -1 -1 -1 -1 -1 -1 17 - -1 -1 -1 -1 -1 -1 -1 -1 -1 11 - -1 -1 -1 -1 -1 -1 -1 -1 -1 13 - -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 - - 256 256 256 - - 12 1 1 18 12 1 12 1 10 6 -)"); -} - -TEST(Scheduler, SchedulerTest) { - auto input = getTestInput(); - - EXPECT_EQ(input.getNumberOfTasks(), 10); - EXPECT_EQ(input.getNumberOfDevices(), 3); - - EXPECT_EQ(input.getDevice(1).getNumberOfCores(), 1); - EXPECT_EQ(input.getTask(6).getIntraDeviceParallelism(), 1); - - EXPECT_EQ(input.getTaskDeviceCostModel(0, 0).getComputationCost(), 14); - EXPECT_EQ(input.getTaskDeviceCostModel(8, 1).getComputationCost(), 12); - EXPECT_EQ(input.getDeviceEdge(0, 2).getDataTransferRate(), 1.0f); - - EXPECT_EQ(input.getTaskEdge(0, 3).getDataSize(), 9); - - EXPECT_EQ(input.getDevice(2).getMaxMemory(), 256); - EXPECT_EQ(input.getTask(3).getStaticMemoryConsumed(), 18); - - auto scheduler = nomscheduler::SimpleScheduler(); - auto output = scheduler.schedule(input); - EXPECT_EQ(output.getFinishTime(), 55); -} - -TEST(Scheduler, HEFTSchedulerTest) { - auto error = 1E-3; - - auto input = getTestInput(); - auto scheduler = nomscheduler::HEFTScheduler(); - auto outputAndState = scheduler.scheduleInternal(input); - auto state = outputAndState.second; - EXPECT_NEAR(state.avgDataTransferRate, 1.0f, error); - - auto task9 = state.tasksState.at(9); - EXPECT_NEAR(task9.avgComputationCost, 14.6666f, error); - // This task has no dependency - EXPECT_NEAR(task9.upwardRank, task9.avgComputationCost, error); - - auto task8 = state.tasksState.at(8); - EXPECT_NEAR(task8.avgComputationCost, 16.6666f, error); - EXPECT_NEAR( - task8.upwardRank, - task8.avgComputationCost + - input.getTaskEdge(8, 9).getDataSize() / state.avgDataTransferRate + - state.tasksState.at(9).upwardRank, - error); - EXPECT_NEAR(task8.upwardRank, 44.333f, error); - - auto task0 = state.tasksState.at(0); - EXPECT_NEAR(task0.avgComputationCost, 13.0f, error); - EXPECT_NEAR( - task0.upwardRank, - task0.avgComputationCost + - input.getTaskEdge(0, 1).getDataSize() / state.avgDataTransferRate + - state.tasksState.at(1).upwardRank, - error); - EXPECT_NEAR(task0.upwardRank, 108.0f, error); - - auto sortedTaskIds = std::vector{0, 2, 3, 1, 4, 5, 8, 6, 7, 9}; - EXPECT_EQ(state.taskIdsByUpwardRank, sortedTaskIds); - - // Verify the output of the HEFT scheduler. - // The input and output in this unit test matches the example in the - // original HEFT paper. - auto output = outputAndState.first; - EXPECT_FALSE(output.isFailure()); - EXPECT_NEAR(output.getFinishTime(), 80, error); - - auto expectedAssignedDeviceId = - std::vector{2, 0, 2, 1, 2, 1, 2, 0, 1, 1}; - auto expectedStartTime = - std::vector{0, 27, 9, 18, 28, 26, 38, 57, 56, 73}; - auto assignedDeviceId = std::vector(); - auto scheduledStartTime = std::vector(); - for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) { - auto& taskScheduleItem = output.getTaskScheduleItem(taskId); - assignedDeviceId.emplace_back(taskScheduleItem.getAssignedDeviceId()); - scheduledStartTime.emplace_back(taskScheduleItem.getStartTime()); - } - EXPECT_EQ(assignedDeviceId, expectedAssignedDeviceId); - EXPECT_EQ(scheduledStartTime, expectedStartTime); -} - -TEST(Scheduler, CriticalPathAnalyzer) { - auto input = getTestInput(); - auto analyzer = nomscheduler::CriticalPathAnalyzer(); - auto output = analyzer.analyze(input); - EXPECT_EQ(output.getTotalCost(), 54.0f); - auto expectedTaskIds = std::vector{0, 1, 8, 9}; - auto expectedDeviceIds = std::vector{1, 1, 1, 1}; - EXPECT_EQ(output.getTaskIds(), expectedTaskIds); - EXPECT_EQ(output.getDeviceIds(), expectedDeviceIds); -}