add_subdirectory(utils)
add_subdirectory(predictor)
add_subdirectory(core/nomnigraph)
- add_subdirectory(core/nomscheduler)
add_subdirectory(serialize)
if (USE_NVRTC)
add_subdirectory(cuda_rtc)
+++ /dev/null
-# ---[ CPU files.
-file(GLOB_RECURSE NOMSCHEDULER_SRCS *.cc)
-file(GLOB_RECURSE NOMSCHEDULER_TEST_SRCS *Test.cc)
-exclude(NOMSCHEDULER_SRCS "${NOMSCHEDULER_SRCS}" "${NOMSCHEDULER_TEST_SRCS}")
-
-install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include
- DESTINATION include
- FILES_MATCHING PATTERN "*.h")
+++ /dev/null
-# nomscheduler
-
-nomscheduler is a package built on top of nomnigraph that provides support for task scheduling algorithms
+++ /dev/null
-#include "CriticalPathAnalyzer.h"
-#include <vector>
-
-namespace {
-
-// Data structure used by the dynamic programming algorithm.
-struct PathTrace {
- float cost;
- bool computed = false;
- // Successive (taskId, deviceId) on the critical path.
- int depTaskId = -1;
- int depDeviceId = -1;
-};
-
-// List of tasks and device assignment along a path.
-struct FullPathTrace {
- std::vector<int> taskIds;
- std::vector<int> deviceIds;
-};
-
-struct State {
- explicit State(const nomscheduler::SchedulerInput& input) {
- int nTasks = input.getNumberOfTasks();
- int nDevices = input.getNumberOfDevices();
- pathTrace.resize(nTasks);
- for (int taskId = 0; taskId < nTasks; taskId++) {
- pathTrace.at(taskId).resize(nDevices);
- }
- }
-
- // Use dynamic programming to compute the theorical critical path.
- // pathTrace[T][D] = theoretical critical path starting from task T,
- // given that task T is executed on device D.
- // Dynamic programming can be used because the critical path has optimize
- // substructure (on a DAG), and can be expressed recursively as follows.
- // pathTrace[T][D] =
- // argmax(T', compCost(T, D) +
- // argmin(D', commCost(T, D, T', D') + pathTrace[T'][D'])
- std::vector<std::vector<PathTrace>> pathTrace;
-};
-
-void computePathTrace(
- const nomscheduler::SchedulerInput& input,
- State& state,
- int taskId,
- int deviceId) {
- auto& trace = state.pathTrace.at(taskId).at(deviceId);
- if (trace.computed) {
- return;
- }
-
- int nDevices = input.getNumberOfDevices();
- float computationCost =
- input.getTaskDeviceCostModel(taskId, deviceId).getComputationCost();
- float maxCost = computationCost;
-
- // Recursively compute (with memoization) critical path trace on all possible
- // (depTaskId, depDeviceId) combinations where depTaskId is a dependent of
- // taskId.
- for (auto outEdge : input.getTaskNode(taskId)->getOutEdges()) {
- auto depTaskId = outEdge->head()->data().getId();
-
- float dataSize = outEdge->data().getDataSize();
-
- int bestCaseDeviceId = -1;
- float bestCaseCost = 0;
-
- for (int depDeviceId = 0; depDeviceId < nDevices; depDeviceId++) {
- float commCost = dataSize *
- input.getDeviceEdge(deviceId, depDeviceId).getDataTransferRate();
-
- // Make sure that path trace is computed on (depTaskId, depDeviceId)
- computePathTrace(input, state, depTaskId, depDeviceId);
-
- auto& depTrace = state.pathTrace.at(depTaskId).at(depDeviceId);
-
- float totalCost = computationCost + commCost + depTrace.cost;
-
- if (bestCaseDeviceId == -1 || totalCost < bestCaseCost) {
- // Given depTaskId, choose the device assignment that minimizes the
- // total computation cost.
- bestCaseDeviceId = depDeviceId;
- bestCaseCost = totalCost;
- }
- }
-
- if (bestCaseCost > maxCost) {
- maxCost = bestCaseCost;
- trace.depTaskId = depTaskId;
- trace.depDeviceId = bestCaseDeviceId;
- }
- }
-
- trace.computed = true;
- trace.cost = maxCost;
-}
-
-FullPathTrace
-constructFullPathTrace(const State& state, int taskId, int deviceId) {
- FullPathTrace output;
- int t = taskId;
- int d = deviceId;
- while (t != -1) {
- output.taskIds.emplace_back(t);
- output.deviceIds.emplace_back(d);
- auto& trace = state.pathTrace.at(t).at(d);
- t = trace.depTaskId;
- d = trace.depDeviceId;
- }
- return output;
-}
-
-} // namespace
-
-namespace nomscheduler {
-
-CriticalPathOutput CriticalPathAnalyzer::analyze(const SchedulerInput& input) {
- CriticalPathOutput output;
-
- auto state = State(input);
-
- float maxCost = 0;
- int nTasks = input.getNumberOfTasks();
- int nDevices = input.getNumberOfDevices();
-
- int criticalPathTaskId = -1;
- int criticalPathDeviceId = -1;
-
- for (int taskId = 0; taskId < nTasks; taskId++) {
- float bestCaseDeviceId = -1;
- float bestCaseCost = 0;
- for (int deviceId = 0; deviceId < nDevices; deviceId++) {
- computePathTrace(input, state, taskId, deviceId);
- auto& trace = state.pathTrace.at(taskId).at(deviceId);
- if (bestCaseDeviceId == -1 || trace.cost < bestCaseCost) {
- bestCaseCost = trace.cost;
- bestCaseDeviceId = deviceId;
- }
- }
- if (bestCaseCost > maxCost) {
- maxCost = bestCaseCost;
- criticalPathTaskId = taskId;
- criticalPathDeviceId = bestCaseDeviceId;
- }
- }
-
- auto fullPathTrace =
- constructFullPathTrace(state, criticalPathTaskId, criticalPathDeviceId);
- output.setOutput(maxCost, fullPathTrace.taskIds, fullPathTrace.deviceIds);
-
- return output;
-}
-
-} // namespace nomscheduler
+++ /dev/null
-//===----------------------------------------------------------------------===//
-//
-// Tool to analyze (theoretical) critical path of a task scheduling problems.
-//
-//===----------------------------------------------------------------------===//
-
-#ifndef NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H
-#define NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H
-
-#include <vector>
-
-#include "Scheduler.h"
-
-namespace nomscheduler {
-
-class CriticalPathOutput {
- public:
- float getTotalCost() const {
- return totalCost_;
- }
-
- std::vector<int> getTaskIds() const {
- return taskIds_;
- }
-
- std::vector<int> getDeviceIds() const {
- return deviceIds_;
- }
-
- void setOutput(
- float totalCost,
- const std::vector<int>& taskIds,
- const std::vector<int>& deviceIds) {
- totalCost_ = totalCost;
- taskIds_ = taskIds;
- deviceIds_ = deviceIds;
- }
-
- private:
- float totalCost_;
-
- // Task along the critical path.
- std::vector<int> taskIds_;
- // Device assignment of the tasks along the critical path.
- std::vector<int> deviceIds_;
-};
-
-class CriticalPathAnalyzer {
- public:
- // Analyze the theoretical critical path(s) of an input scheduling problem.
- // Communication cost should be taken into account.
- // Formal definition
- // The best-scenario computation cost of a path
- // Task1 -> ... -> TaskK
- // in the task dependency graph
- // is defined by chossing a device assignment
- // (Device1, ... , DeviceK)
- // that minimizes the total computation cost (communication cost is taken
- // into account).
- //
- // The critical path is defined as the path that has the maximum
- // best-scenario computation cost.
- CriticalPathOutput analyze(const SchedulerInput& input);
-};
-
-} // namespace nomscheduler
-
-#endif // NOM_SCHEDULER_CRITICAL_PATH_ANALYZER_H
+++ /dev/null
-#ifndef NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H
-#define NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H
-
-#include "Scheduler.h"
-
-namespace heftscheduler {
-
-// Internal state associated with a task while the algorithm is running.
-struct TaskState {
- // Average computation cost across all devices.
- float avgComputationCost;
-
- // The upward rank of a task T is defined recursively as:
- // upwardRank(T) =
- // avgComputationCost(T) + max(avgCommCost(T, T') + upwardRank(T'))
- // Basically, upwardRank(T) is the length of the critical path from
- // task T to the exit task, including the computation cost of task T.
- float upwardRank;
- bool upwardRankComputed = false;
-};
-
-// Represents a slot of time to schedule tasks on a device.
-// Additionally store the number of used cores, for intra-op parallelism.
-struct CoreSlot {
- float startTime, endTime;
- int usedCores;
-};
-
-// Internal state associated with a device while the algorithm is running.
-struct DeviceState {
- // Maintain a list of slots per device.
- // The slots are guaranteed to be continuous.
- std::vector<CoreSlot> slots;
-};
-
-// Internal state of the HEFT scheduling algorithm.
-struct AlgorithmState {
- explicit AlgorithmState(const nomscheduler::SchedulerInput& input) {
- int nTasks = input.getNumberOfTasks();
-
- tasksState.resize(nTasks);
-
- // Initial, unsorted values.
- taskIdsByUpwardRank.resize(nTasks);
- for (int taskId = 0; taskId < nTasks; taskId++) {
- taskIdsByUpwardRank[taskId] = taskId;
- }
-
- int nDevices = input.getNumberOfDevices();
- devicesState.resize(nDevices);
- for (int deviceId = 0; deviceId < nDevices; deviceId++) {
- CoreSlot all;
- all.startTime = 0;
- all.endTime = std::numeric_limits<float>::infinity();
- all.usedCores = 0;
- // Initially, there is no task scheduled on each device, so we just make
- // one slot that covers the entire time horizon.
- devicesState.at(deviceId).slots.emplace_back(all);
- }
- }
-
- std::vector<TaskState> tasksState;
-
- std::vector<DeviceState> devicesState;
-
- // Task ids sorted by upward ranks in decreasing order.
- // It can be shown that this is also a topological order.
- std::vector<int> taskIdsByUpwardRank;
-
- // Average data transfer rate between two devices.
- float avgDataTransferRate;
-};
-
-} // namespace heftscheduler
-
-#endif // NOM_SCHEDULER_HEFT_SCHEDULER_INTERNAL_H
+++ /dev/null
-#include "HEFTScheduler.h"
-#include <vector>
-
-namespace {
-
-void computeAverageComputationCost(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state,
- int taskId) {
- float sum = 0;
- for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) {
- sum += input.getTaskDeviceCostModel(taskId, deviceId).getComputationCost();
- }
- state.tasksState.at(taskId).avgComputationCost =
- sum / input.getNumberOfDevices();
-}
-
-void computeAverageComputationCost(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state) {
- for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) {
- computeAverageComputationCost(input, state, taskId);
- }
-}
-
-void computeAverageDataTransferRate(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state) {
- state.avgDataTransferRate = 0;
-
- int nDevices = input.getNumberOfDevices();
- for (int deviceId1 = 0; deviceId1 < nDevices; deviceId1++) {
- for (int deviceId2 = 0; deviceId2 < nDevices; deviceId2++) {
- if (deviceId1 != deviceId2) {
- state.avgDataTransferRate +=
- input.getDeviceEdge(deviceId1, deviceId2).getDataTransferRate();
- }
- }
- }
-
- if (nDevices > 1) {
- state.avgDataTransferRate /= nDevices * (nDevices - 1);
- }
-}
-
-void computeUpwardRank(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state,
- int taskId) {
- auto& taskState = state.tasksState.at(taskId);
- if (taskState.upwardRankComputed) {
- return;
- }
-
- float maxDependentCost = 0;
- for (auto outEdge : input.getTaskNode(taskId)->getOutEdges()) {
- auto dependentTask = outEdge->head();
- auto dependentTaskId = dependentTask->data().getId();
- computeUpwardRank(input, state, dependentTaskId);
-
- float avgCommCost =
- input.getTaskEdge(taskId, dependentTaskId).getDataSize() *
- state.avgDataTransferRate;
-
- maxDependentCost = std::max(
- maxDependentCost,
- avgCommCost + state.tasksState.at(dependentTaskId).upwardRank);
- }
-
- taskState.upwardRankComputed = true;
- taskState.upwardRank = taskState.avgComputationCost + maxDependentCost;
-}
-
-void computeUpwardRank(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state) {
- for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) {
- computeUpwardRank(input, state, taskId);
- }
-}
-
-void sortTasksByUpwardRank(heftscheduler::AlgorithmState& state) {
- std::sort(
- state.taskIdsByUpwardRank.begin(),
- state.taskIdsByUpwardRank.end(),
- [&state](int taskId1, int taskId2) -> bool {
- return state.tasksState[taskId1].upwardRank >
- state.tasksState[taskId2].upwardRank;
- });
-}
-
-// Task assignment information on a specific device.
-struct TaskAssignment {
- bool possible;
- float start, end;
-};
-
-// Compute the earliest possible start time of a task on a device based
-// on dependency graph information and current schedule.
-float computeEarliestPossibleStartTimeFromDAG(
- const nomscheduler::SchedulerInput& input,
- const heftscheduler::AlgorithmState& state,
- const nomscheduler::SchedulerOutput& output,
- int taskId,
- int deviceId) {
- float result = 0.0f;
- for (auto& inEdge : input.getTaskNode(taskId)->getInEdges()) {
- auto prereqTaskId = inEdge->tail()->data().getId();
- auto& prereqScheduleItem = output.getTaskScheduleItem(prereqTaskId);
- // Since the algorithm schedule tasks in topological order, at this point
- // all the prerequisites should have been scheduled.
- assert(prereqScheduleItem.isAssigned());
-
- // Communication time to send output from the prerequisite task to the
- // current task.
- float commTime = inEdge->data().getDataSize() *
- input.getDeviceEdge(prereqScheduleItem.getAssignedDeviceId(), deviceId)
- .getDataTransferRate();
- result = std::max(result, prereqScheduleItem.getEndTime() + commTime);
- }
- return result;
-}
-
-void computeEarliestTaskAssignment(
- const nomscheduler::SchedulerInput& input,
- const heftscheduler::AlgorithmState& state,
- const nomscheduler::SchedulerOutput& output,
- int taskId,
- int deviceId,
- TaskAssignment& assignment) {
- assignment.possible = false;
- auto& costModel = input.getTaskDeviceCostModel(taskId, deviceId);
- if (!costModel.isPossible()) {
- // If the task cannot be scheduled on the device.
- return;
- }
- float earliestPossibleStartTime = computeEarliestPossibleStartTimeFromDAG(
- input, state, output, taskId, deviceId);
-
- int coresUsedByTask = input.getTask(taskId).getIntraDeviceParallelism();
- int coresInDevice = input.getDevice(deviceId).getNumberOfCores();
-
- auto& slots = state.devicesState.at(deviceId).slots;
- for (int slotId = 0; slotId < slots.size(); slotId++) {
- auto& slot = slots.at(slotId);
- if (earliestPossibleStartTime > slot.endTime) {
- // Ignore slots that end before the earliest possible start time.
- continue;
- }
- float requiredStart = std::max(earliestPossibleStartTime, slot.startTime);
- float requiredEnd = requiredStart + costModel.getComputationCost();
-
- // Find a range of slots that can accommodate the task.
- bool found = false;
- for (int endSlotId = slotId; endSlotId < slots.size(); endSlotId++) {
- auto& endSlot = slots[endSlotId];
- if (endSlot.usedCores + coresUsedByTask > coresInDevice) {
- // Not enough cores to execute the task.
- break;
- }
- if (requiredEnd <= endSlot.endTime) {
- // We found a range of slots that covers the time window to execute
- // the task.
- found = true;
- break;
- }
- }
- if (found) {
- assignment.possible = true;
- assignment.start = requiredStart;
- assignment.end = requiredEnd;
- break;
- }
- }
-}
-
-// Update the list of slots for a device, given the schedule of a new task
-// on that device.
-void updateSlots(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state,
- int taskId,
- int deviceId,
- const TaskAssignment& assignment) {
- auto& slots = state.devicesState.at(deviceId).slots;
-
- // Find start slot of the slot ranges that cover the task assignment time
- // window.
- int startSlotId = 0;
- while (startSlotId < slots.size() &&
- assignment.start >= slots.at(startSlotId).endTime) {
- startSlotId++;
- }
- // Start slot must exist.
- assert(
- startSlotId < slots.size() &&
- assignment.start < slots.at(startSlotId).endTime);
-
- if (assignment.start > slots.at(startSlotId).startTime) {
- auto& startSlot = slots.at(startSlotId);
- // Split the start slot into two.
- heftscheduler::CoreSlot newSlot;
- newSlot.startTime = startSlot.startTime;
- newSlot.endTime = assignment.start;
- newSlot.usedCores = startSlot.usedCores;
-
- startSlot.startTime = assignment.start;
- slots.insert(slots.begin() + startSlotId, newSlot);
- startSlotId++;
- }
-
- // Find end slot of the slot ranges that cover the task assignment time
- // window.
- int endSlotId = startSlotId;
- while (endSlotId < slots.size() &&
- assignment.end > slots.at(endSlotId).endTime) {
- endSlotId++;
- }
- // End slot must exist.
- assert(
- endSlotId < slots.size() &&
- assignment.end <= slots.at(endSlotId).endTime);
-
- if (assignment.end < slots.at(endSlotId).endTime) {
- // Split the end slot into two.
- auto& endSlot = slots.at(endSlotId);
- heftscheduler::CoreSlot newSlot;
- newSlot.startTime = endSlot.startTime;
- newSlot.endTime = assignment.end;
- newSlot.usedCores = endSlot.usedCores;
-
- endSlot.startTime = assignment.end;
- slots.insert(slots.begin() + endSlotId, newSlot);
- }
-
- // Now we just update the usedCores count of the slots.
- int coresUsedByTask = input.getTask(taskId).getIntraDeviceParallelism();
- for (int slotId = startSlotId; slotId <= endSlotId; slotId++) {
- slots.at(slotId).usedCores += coresUsedByTask;
- }
-}
-
-void scheduleTask(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state,
- nomscheduler::SchedulerOutput& output,
- int taskId) {
- // For each device, calculate the earliest possible assignment of the task
- // to the device (or if the task can even be assigned to the device at all).
- std::vector<TaskAssignment> assignments(input.getNumberOfDevices());
- for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) {
- computeEarliestTaskAssignment(
- input, state, output, taskId, deviceId, assignments.at(deviceId));
- }
-
- // Select the device that minimize the earlist finish time of the task.
- int scheduledDeviceId = -1;
- for (int deviceId = 0; deviceId < input.getNumberOfDevices(); deviceId++) {
- auto& assignment = assignments.at(deviceId);
- if (assignment.possible &&
- ((scheduledDeviceId == -1 ||
- assignment.end < assignments.at(scheduledDeviceId).end))) {
- scheduledDeviceId = deviceId;
- }
- }
-
- if (scheduledDeviceId == -1) {
- // No device can execute the task.
- output.setFailure(true);
- } else {
- auto& assignment = assignments.at(scheduledDeviceId);
- auto& taskScheduleItem = output.getMutableTaskScheduleItem(taskId);
- taskScheduleItem.setAssignedDeviceId(scheduledDeviceId);
- taskScheduleItem.setStartTime(assignment.start);
- taskScheduleItem.setEndTime(assignment.end);
- updateSlots(input, state, taskId, scheduledDeviceId, assignment);
- }
-}
-
-void scheduleTasks(
- const nomscheduler::SchedulerInput& input,
- heftscheduler::AlgorithmState& state,
- nomscheduler::SchedulerOutput& output) {
- // Loop over tasks in decreasing order of upward ranks (which is also
- // a topological order) and schedule each one.
- for (int taskId : state.taskIdsByUpwardRank) {
- scheduleTask(input, state, output, taskId);
- if (output.isFailure()) {
- break;
- }
- }
-}
-
-} // namespace
-
-namespace nomscheduler {
-
-std::pair<SchedulerOutput, heftscheduler::AlgorithmState>
-HEFTScheduler::scheduleInternal(const SchedulerInput& input) {
- heftscheduler::AlgorithmState state(input);
-
- computeAverageComputationCost(input, state);
- computeAverageDataTransferRate(input, state);
- computeUpwardRank(input, state);
- sortTasksByUpwardRank(state);
-
- SchedulerOutput output(input.getNumberOfTasks());
- output.setFailure(false);
- scheduleTasks(input, state, output);
- return std::make_pair(output, state);
-}
-
-SchedulerOutput HEFTScheduler::schedule(const SchedulerInput& input) {
- return scheduleInternal(input).first;
-}
-
-} // namespace nomscheduler
+++ /dev/null
-//===----------------------------------------------------------------------===//
-//
-// nomnigraph supports for task scheduling problems.
-// HEFT-based implementation of scheduler.
-//
-// (Heterogeneous Earliest Finish Time)
-// Original description:
-// Performance-effective and low-complexity task scheduling
-// for heterogeneous computing
-// H. Topcuoglu; S. Hariri; Min-You Wu
-// IEEE Transactions on Parallel and Distributed Systems 2002
-//
-//===----------------------------------------------------------------------===//
-
-#ifndef NOM_SCHEDULER_HEFT_SCHEDULER_H
-#define NOM_SCHEDULER_HEFT_SCHEDULER_H
-
-#include "HEFTScheduler-Internal.h"
-#include "Scheduler.h"
-
-namespace nomscheduler {
-
-class HEFTScheduler : Scheduler {
- public:
- virtual SchedulerOutput schedule(const SchedulerInput& input) override;
-
- // Expose a scheduling method that also returns an internal algorithm state
- // for unit testing purpose.
- std::pair<SchedulerOutput, heftscheduler::AlgorithmState> scheduleInternal(
- const SchedulerInput& input);
-};
-
-} // namespace nomscheduler
-
-#endif // NOM_SCHEDULER_HEFT_SCHEDULER_H
+++ /dev/null
-//===----------------------------------------------------------------------===//
-//
-// nomnigraph supports for task scheduling problems.
-//
-//===----------------------------------------------------------------------===//
-
-#ifndef NOM_SCHEDULER_SCHEDULER_H
-#define NOM_SCHEDULER_SCHEDULER_H
-
-#include "caffe2/core/common.h"
-#include "nomnigraph/Graph/Graph.h"
-
-#include <algorithm>
-#include <vector>
-
-namespace nomscheduler {
-
-// Models a processing unit (such as CPU/GPU/accelerator/...) that can execute a
-// task.
-class Device {
- public:
- Device() {}
-
- int getNumberOfCores() const {
- return numberOfCores_;
- }
-
- void setNumberOfCores(int numberOfCores) {
- numberOfCores_ = numberOfCores;
- }
-
- // unit: GB
- float getMaxMemory() const {
- return maxMemory_;
- }
-
- void setMaxMemory(float maxMemory) {
- maxMemory_ = maxMemory;
- }
-
- private:
- int numberOfCores_;
- float maxMemory_;
-};
-
-// Models a link between two devices.
-class DeviceEdge {
- public:
- DeviceEdge() {}
-
- // data transfer rate between two devices (unit: s / bytes)
- float getDataTransferRate() const {
- return dataTransferRate_;
- }
-
- void setDataTransferRate(float dataTransferRate) {
- dataTransferRate_ = dataTransferRate;
- }
-
- private:
- float dataTransferRate_;
-};
-
-// Models a unit of work that can be scheduled to run on a device.
-class Task {
- public:
- Task(int taskId) : taskId_(taskId) {}
-
- // number of cores that will be used by the task
- int getIntraDeviceParallelism() const {
- return intraDeviceParallelism_;
- }
-
- void setIntraDeviceParallelism(int intraDeviceParallelism) {
- intraDeviceParallelism_ = intraDeviceParallelism;
- }
-
- // static memory consumed by the task, unit: GB
- float getStaticMemoryConsumed() const {
- return staticMemoryConsumed_;
- }
-
- void setStaticMemoryConsumed(float staticMemoryConsumed) {
- staticMemoryConsumed_ = staticMemoryConsumed;
- }
-
- int getId() const {
- return taskId_;
- }
-
- private:
- int intraDeviceParallelism_;
- float staticMemoryConsumed_;
- int taskId_;
-};
-
-// Model a dependency between two tasks. An edge between Task A -> Task B
-// means that Task B depends on the output of Task A, and so Task B must start
-// after task A finishes.
-// The edge A->B also holds the size of the data that needs to be transferred
-// from task A to task B, i.e. the total size of the blobs produced by A
-// and consumed by B.
-class TaskEdge {
- public:
- // size of data transfered between two tasks (unit : bytes)
- float getDataSize() const {
- return dataSize_;
- }
-
- void setDataSize(float dataSize) {
- dataSize_ = dataSize;
- }
-
- private:
- float dataSize_;
-};
-
-// Represents the cost model of a task executed on a specific device.
-class TaskDeviceEdge {
- public:
- // estimated computation cost for a task executed by a device
- // (runtime, unit: ms)
- float getComputationCost() const {
- return computationCost_;
- }
-
- void setComputationCost(float computationCost) {
- computationCost_ = computationCost;
- }
-
- // Return true if the task can be executed by the device.
- bool isPossible() const {
- return possible_;
- }
-
- void setPossible(bool possible) {
- possible_ = possible;
- }
-
- private:
- float computationCost_;
- bool possible_ = true;
-};
-
-// dependency DAG between tasks
-using TaskGraph = nom::Graph<Task, TaskEdge>;
-
-// (undirected) graph between devices, to represent communication links
-// between devices
-using DeviceGraph = nom::Graph<Device, DeviceEdge>;
-
-// (bipartite) task - device graph, represents estimated cost model for
-// task execution on each device
-// We don't currently store data on this graph's node, so int type is just a
-// placeholder.
-using TaskDeviceCostModelGraph = nom::Graph<int /*unused*/, TaskDeviceEdge>;
-
-// Input to the scheduler. Underneath, the input is represented by one
-// TaskGraph, one DeviceGraph and one TaskDeviceCostModelGraph.
-class SchedulerInput {
- public:
- SchedulerInput(int numTasks, int numDevices) {
- for (int taskId = 0; taskId < numTasks; taskId++) {
- tasks_.emplace_back(taskGraph_.createNode(taskId));
- taskNodes_.emplace_back(costModelGraph_.createNode());
- }
-
- for (int deviceId = 0; deviceId < numDevices; deviceId++) {
- devices_.emplace_back(deviceGraph_.createNode());
- deviceNodes_.emplace_back(costModelGraph_.createNode());
- }
-
- for (int taskId = 0; taskId < numTasks; taskId++) {
- for (int deviceId = 0; deviceId < numDevices; deviceId++) {
- costModelGraph_.createEdge(
- taskNodes_[taskId], deviceNodes_[deviceId], TaskDeviceEdge());
- }
- }
-
- for (int deviceId1 = 0; deviceId1 < numDevices; deviceId1++) {
- for (int deviceId2 = 0; deviceId2 < numDevices; deviceId2++) {
- deviceGraph_.createEdge(
- devices_[deviceId1], devices_[deviceId2], DeviceEdge());
- }
- }
- }
-
- int getNumberOfDevices() const {
- return deviceGraph_.getNodesCount();
- }
-
- int getNumberOfTasks() const {
- return taskGraph_.getNodesCount();
- }
-
- Device* getMutableDevice(int deviceId) {
- return devices_[deviceId]->mutableData();
- }
-
- Task* getMutableTask(int taskId) {
- return tasks_[taskId]->mutableData();
- }
-
- const Device& getDevice(int deviceId) const {
- return devices_[deviceId]->data();
- }
-
- const Task& getTask(int taskId) const {
- return tasks_[taskId]->data();
- }
-
- void createTaskDependency(int taskId1, int taskId2) {
- taskGraph_.createEdge(tasks_[taskId1], tasks_[taskId2], TaskEdge());
- }
-
- TaskDeviceEdge* getMutableTaskDeviceCostModel(int taskId, int deviceId) {
- return costModelGraph_.getEdge(taskNodes_[taskId], deviceNodes_[deviceId])
- ->mutableData();
- }
-
- const TaskDeviceEdge& getTaskDeviceCostModel(int taskId, int deviceId) const {
- return costModelGraph_.getEdge(taskNodes_[taskId], deviceNodes_[deviceId])
- ->data();
- }
-
- DeviceEdge* getMutableDeviceEdge(int deviceId1, int deviceId2) {
- return deviceGraph_.getEdge(devices_[deviceId1], devices_[deviceId2])
- ->mutableData();
- }
-
- const DeviceEdge& getDeviceEdge(int deviceId1, int deviceId2) const {
- return deviceGraph_.getEdge(devices_[deviceId1], devices_[deviceId2])
- ->data();
- }
-
- TaskEdge* getMutableTaskEdge(int taskId1, int taskId2) {
- return taskGraph_.getEdge(tasks_[taskId1], tasks_[taskId2])->mutableData();
- }
-
- const TaskEdge& getTaskEdge(int taskId1, int taskId2) const {
- return taskGraph_.getEdge(tasks_[taskId1], tasks_[taskId2])->data();
- }
-
- TaskGraph::NodeRef getTaskNode(int taskId) const {
- return tasks_[taskId];
- }
-
- private:
- TaskGraph taskGraph_;
- DeviceGraph deviceGraph_;
- TaskDeviceCostModelGraph costModelGraph_;
-
- std::vector<TaskGraph::NodeRef> tasks_;
- std::vector<DeviceGraph::NodeRef> devices_;
-
- std::vector<TaskDeviceCostModelGraph::NodeRef> taskNodes_;
- std::vector<TaskDeviceCostModelGraph::NodeRef> deviceNodes_;
-};
-
-// Represents a schedule item for a task. Consists of the device that the task
-// should be assigned to, and the (estimated) start and end time of the task
-// execution based on the cost model given to the scheduler.
-class TaskScheduleItem {
- public:
- int getAssignedDeviceId() const {
- return assignedDeviceId_;
- }
-
- bool isAssigned() const {
- return assignedDeviceId_ != -1;
- }
-
- void setAssignedDeviceId(int assignedDeviceId) {
- assignedDeviceId_ = assignedDeviceId;
- }
-
- float getStartTime() const {
- return startTime_;
- }
-
- void setStartTime(float startTime) {
- startTime_ = startTime;
- }
-
- float getEndTime() const {
- return endTime_;
- }
-
- void setEndTime(float endTime) {
- endTime_ = endTime;
- }
-
- private:
- int assignedDeviceId_ = -1;
- float startTime_;
- float endTime_;
-};
-
-// Represents an output of the static scheduler - a map from each task
-// to a TaskScheduleItem for that task.
-class SchedulerOutput {
- public:
- SchedulerOutput(int numTasks) {
- for (int i = 0; i < numTasks; i++) {
- taskScheduleItems_.emplace_back();
- }
- }
-
- TaskScheduleItem& getMutableTaskScheduleItem(int taskId) {
- return taskScheduleItems_[taskId];
- }
-
- const TaskScheduleItem& getTaskScheduleItem(int taskId) const {
- return taskScheduleItems_[taskId];
- }
-
- // The finish time of the schedule, which is just the maximum end time
- // of all the schedule items.
- float getFinishTime() const {
- float result = 0;
- for (auto& scheduleItem : taskScheduleItems_) {
- result = std::max(result, scheduleItem.getEndTime());
- }
- return result;
- }
-
- // Fails to compute a schedule.
- bool isFailure() const {
- return failure_;
- }
-
- void setFailure(bool failure) {
- failure_ = failure;
- }
-
- private:
- std::vector<TaskScheduleItem> taskScheduleItems_;
- bool failure_;
-};
-
-// Interface for static schedulers.
-class Scheduler {
- public:
- virtual ~Scheduler() {}
- virtual SchedulerOutput schedule(const SchedulerInput&) = 0;
-};
-
-} // namespace nomscheduler
-
-#endif // NOM_SCHEDULER_SCHEDULER_H
+++ /dev/null
-#include <algorithm>
-#include <cstdio>
-#include <limits>
-#include <sstream>
-#include <unordered_set>
-
-#include "nomscheduler/Scheduler/CriticalPathAnalyzer.h"
-#include "nomscheduler/Scheduler/HEFTScheduler.h"
-#include "nomscheduler/Scheduler/Scheduler.h"
-
-#include <gtest/gtest.h>
-
-namespace nomscheduler {
-
-SchedulerInput loadSchedulerInputFromString(const std::string& fileInput) {
- std::stringstream ss;
- ss << fileInput;
-
- int numTasks, numDevices;
- ss >> numTasks >> numDevices;
-
- SchedulerInput result(numTasks, numDevices);
-
- // Cores per devices
- for (int id = 0; id < numDevices; id++) {
- int numCores;
- ss >> numCores;
- result.getMutableDevice(id)->setNumberOfCores(numCores);
- }
-
- // Parallelism per task
- for (int id = 0; id < numTasks; id++) {
- int parallelismLevel;
- ss >> parallelismLevel;
- result.getMutableTask(id)->setIntraDeviceParallelism(parallelismLevel);
- }
-
- // The computation costs of each task
- for (int taskId = 0; taskId < numTasks; taskId++) {
- for (int deviceId = 0; deviceId < numDevices; deviceId++) {
- float cost;
- ss >> cost;
- if (cost < 0) {
- result.getMutableTaskDeviceCostModel(taskId, deviceId)
- ->setPossible(false);
- } else {
- result.getMutableTaskDeviceCostModel(taskId, deviceId)
- ->setComputationCost(cost);
- }
- }
- }
-
- for (int deviceId1 = 0; deviceId1 < numDevices; deviceId1++) {
- for (int deviceId2 = 0; deviceId2 < numDevices; deviceId2++) {
- float rate;
- ss >> rate;
- result.getMutableDeviceEdge(deviceId1, deviceId2)
- ->setDataTransferRate(rate);
- }
- }
-
- for (int taskId1 = 0; taskId1 < numTasks; taskId1++) {
- for (int taskId2 = 0; taskId2 < numTasks; taskId2++) {
- float dataSize;
- ss >> dataSize;
- if (dataSize > 0) {
- result.createTaskDependency(taskId1, taskId2);
- result.getMutableTaskEdge(taskId1, taskId2)->setDataSize(dataSize);
- }
- }
- }
-
- for (int deviceId = 0; deviceId < numDevices; deviceId++) {
- float maxMemory;
- ss >> maxMemory;
- result.getMutableDevice(deviceId)->setMaxMemory(maxMemory);
- }
-
- for (int id = 0; id < numTasks; id++) {
- float staticMemoryConsumed;
- ss >> staticMemoryConsumed;
- result.getMutableTask(id)->setStaticMemoryConsumed(staticMemoryConsumed);
- }
-
- return result;
-}
-
-// A simple scheduling algorithm, just for testing and comparison purpose.
-// For each iteration:
-// - Pick any task that is ready to schedule (no dependency)
-// - Then pick a device that has the earliest next available time to
-// schedule that task.
-// For simplicity, this algorithm does not take into account any resource
-// constraints.
-class SimpleScheduler : Scheduler {
- public:
- SchedulerOutput schedule(const SchedulerInput& input) override {
- int numTasks = input.getNumberOfTasks();
- SchedulerOutput result(numTasks);
-
- std::unordered_set<TaskGraph::NodeRef> scheduledTasks;
-
- // Next available time per device.
- std::vector<float> nextFreeTime;
- for (int i = 0; i < input.getNumberOfDevices(); i++) {
- nextFreeTime.emplace_back(0);
- }
-
- while (scheduledTasks.size() < numTasks) {
- for (int taskId = 0; taskId < numTasks; taskId++) {
- auto taskNode = input.getTaskNode(taskId);
- if (scheduledTasks.count(taskNode)) {
- continue;
- }
-
- bool hasDependency = false;
- for (auto& inEdge : taskNode->getInEdges()) {
- auto tail = inEdge->tail();
- if (!scheduledTasks.count(tail)) {
- hasDependency = true;
- break;
- }
- }
-
- if (!hasDependency) {
- scheduledTasks.insert(taskNode);
-
- // Find the device with earliest next available time.
- int earliestDeviceId = 0;
- for (int deviceId = 1; deviceId < input.getNumberOfDevices();
- deviceId++) {
- if (nextFreeTime[deviceId] < nextFreeTime[earliestDeviceId]) {
- earliestDeviceId = deviceId;
- }
- }
-
- // Schedule the task on the device.
- auto& taskScheduleItem = result.getMutableTaskScheduleItem(taskId);
- taskScheduleItem.setAssignedDeviceId(earliestDeviceId);
- taskScheduleItem.setStartTime(nextFreeTime[earliestDeviceId]);
- auto computationCost =
- input.getTaskDeviceCostModel(taskId, earliestDeviceId)
- .getComputationCost();
- taskScheduleItem.setEndTime(
- taskScheduleItem.getStartTime() + computationCost);
-
- // Update next available time for the device.
- nextFreeTime[earliestDeviceId] = taskScheduleItem.getEndTime();
- break;
- }
- }
- }
-
- return result;
- }
-};
-
-} // namespace nomscheduler
-
-nomscheduler::SchedulerInput getTestInput() {
- return nomscheduler::loadSchedulerInputFromString(R"(
- 10 3
-
- 1 1 1
- 1 1 1 1 1 1 1 1 1 1
-
- 14 16 9
- 13 19 18
- 11 13 19
- 13 8 17
- 12 13 10
- 13 16 9
- 7 15 11
- 5 11 14
- 18 12 20
- 21 7 16
-
- 0 1 1
- 1 0 1
- 1 1 0
-
- -1 18 12 9 11 14 -1 -1 -1 -1
- -1 -1 -1 -1 -1 -1 -1 19 16 -1
- -1 -1 -1 -1 -1 -1 23 -1 -1 -1
- -1 -1 -1 -1 -1 -1 -1 27 23 -1
- -1 -1 -1 -1 -1 -1 -1 -1 13 -1
- -1 -1 -1 -1 -1 -1 -1 15 -1 -1
- -1 -1 -1 -1 -1 -1 -1 -1 -1 17
- -1 -1 -1 -1 -1 -1 -1 -1 -1 11
- -1 -1 -1 -1 -1 -1 -1 -1 -1 13
- -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
-
- 256 256 256
-
- 12 1 1 18 12 1 12 1 10 6
-)");
-}
-
-TEST(Scheduler, SchedulerTest) {
- auto input = getTestInput();
-
- EXPECT_EQ(input.getNumberOfTasks(), 10);
- EXPECT_EQ(input.getNumberOfDevices(), 3);
-
- EXPECT_EQ(input.getDevice(1).getNumberOfCores(), 1);
- EXPECT_EQ(input.getTask(6).getIntraDeviceParallelism(), 1);
-
- EXPECT_EQ(input.getTaskDeviceCostModel(0, 0).getComputationCost(), 14);
- EXPECT_EQ(input.getTaskDeviceCostModel(8, 1).getComputationCost(), 12);
- EXPECT_EQ(input.getDeviceEdge(0, 2).getDataTransferRate(), 1.0f);
-
- EXPECT_EQ(input.getTaskEdge(0, 3).getDataSize(), 9);
-
- EXPECT_EQ(input.getDevice(2).getMaxMemory(), 256);
- EXPECT_EQ(input.getTask(3).getStaticMemoryConsumed(), 18);
-
- auto scheduler = nomscheduler::SimpleScheduler();
- auto output = scheduler.schedule(input);
- EXPECT_EQ(output.getFinishTime(), 55);
-}
-
-TEST(Scheduler, HEFTSchedulerTest) {
- auto error = 1E-3;
-
- auto input = getTestInput();
- auto scheduler = nomscheduler::HEFTScheduler();
- auto outputAndState = scheduler.scheduleInternal(input);
- auto state = outputAndState.second;
- EXPECT_NEAR(state.avgDataTransferRate, 1.0f, error);
-
- auto task9 = state.tasksState.at(9);
- EXPECT_NEAR(task9.avgComputationCost, 14.6666f, error);
- // This task has no dependency
- EXPECT_NEAR(task9.upwardRank, task9.avgComputationCost, error);
-
- auto task8 = state.tasksState.at(8);
- EXPECT_NEAR(task8.avgComputationCost, 16.6666f, error);
- EXPECT_NEAR(
- task8.upwardRank,
- task8.avgComputationCost +
- input.getTaskEdge(8, 9).getDataSize() / state.avgDataTransferRate +
- state.tasksState.at(9).upwardRank,
- error);
- EXPECT_NEAR(task8.upwardRank, 44.333f, error);
-
- auto task0 = state.tasksState.at(0);
- EXPECT_NEAR(task0.avgComputationCost, 13.0f, error);
- EXPECT_NEAR(
- task0.upwardRank,
- task0.avgComputationCost +
- input.getTaskEdge(0, 1).getDataSize() / state.avgDataTransferRate +
- state.tasksState.at(1).upwardRank,
- error);
- EXPECT_NEAR(task0.upwardRank, 108.0f, error);
-
- auto sortedTaskIds = std::vector<int>{0, 2, 3, 1, 4, 5, 8, 6, 7, 9};
- EXPECT_EQ(state.taskIdsByUpwardRank, sortedTaskIds);
-
- // Verify the output of the HEFT scheduler.
- // The input and output in this unit test matches the example in the
- // original HEFT paper.
- auto output = outputAndState.first;
- EXPECT_FALSE(output.isFailure());
- EXPECT_NEAR(output.getFinishTime(), 80, error);
-
- auto expectedAssignedDeviceId =
- std::vector<int>{2, 0, 2, 1, 2, 1, 2, 0, 1, 1};
- auto expectedStartTime =
- std::vector<float>{0, 27, 9, 18, 28, 26, 38, 57, 56, 73};
- auto assignedDeviceId = std::vector<int>();
- auto scheduledStartTime = std::vector<float>();
- for (int taskId = 0; taskId < input.getNumberOfTasks(); taskId++) {
- auto& taskScheduleItem = output.getTaskScheduleItem(taskId);
- assignedDeviceId.emplace_back(taskScheduleItem.getAssignedDeviceId());
- scheduledStartTime.emplace_back(taskScheduleItem.getStartTime());
- }
- EXPECT_EQ(assignedDeviceId, expectedAssignedDeviceId);
- EXPECT_EQ(scheduledStartTime, expectedStartTime);
-}
-
-TEST(Scheduler, CriticalPathAnalyzer) {
- auto input = getTestInput();
- auto analyzer = nomscheduler::CriticalPathAnalyzer();
- auto output = analyzer.analyze(input);
- EXPECT_EQ(output.getTotalCost(), 54.0f);
- auto expectedTaskIds = std::vector<int>{0, 1, 8, 9};
- auto expectedDeviceIds = std::vector<int>{1, 1, 1, 1};
- EXPECT_EQ(output.getTaskIds(), expectedTaskIds);
- EXPECT_EQ(output.getDeviceIds(), expectedDeviceIds);
-}