Imported Upstream version 1.21.0
[platform/core/ml/nnfw.git] / compiler / one-cmds / onelib / WorkflowRunner.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #    http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import json
18 import os
19
20 from onelib.OptionBuilder import OptionBuilder
21 from onelib.TopologicalSortHelper import TopologicalSortHelper
22 from onelib.CfgRunner import CfgRunner
23 import utils as oneutils
24
25
26 class WorkflowRunner:
27     WORKFLOWS_K = 'workflows'
28     DEPENDENCIES_K = 'run-after'
29     CFG_REFERENCE_K = 'cfg-reference'
30     WORKFLOW_STEPS_K = 'steps'
31     ONE_CMD_TOOL_K = 'one-cmd'
32     COMMANDS_K = 'commands'
33
34     def __init__(self, path):
35         try:
36             with open(path) as f:
37                 self.json_contents = json.load(f)
38         except FileNotFoundError:
39             raise FileNotFoundError("Not found given workflow file")
40         except json.decoder.JSONDecodeError:
41             raise ImportError("Invalid workflow file")
42
43         self._verify_workflow(self.json_contents)
44
45         workflows = self.json_contents[self.WORKFLOWS_K]
46         self.adj = dict.fromkeys(workflows, [])
47         # decide the order according to the dependencies of each workflow.
48         helper = TopologicalSortHelper(workflows)
49         for workflow_k in workflows:
50             workflow = self.json_contents[workflow_k]
51             if self.DEPENDENCIES_K in workflow:
52                 for previous_workflow in workflow[self.DEPENDENCIES_K]:
53                     helper.add_edge(previous_workflow, workflow_k)
54                     self.adj[previous_workflow].append(workflow_k)
55         self.workflow_sequence = helper.sort()
56
57         self._check_cycle()
58
59     def _check_cycle(self):
60         pos = dict()
61         index = 0
62         workflow_num = len(self.workflow_sequence)
63         # number the order
64         for seq_idx in range(workflow_num):
65             pos[self.workflow_sequence[seq_idx]] = index
66             index += 1
67
68         for seq_idx in range(workflow_num):
69             first_wf = self.workflow_sequence[seq_idx]
70             for adj_wf in self.adj[first_wf]:
71                 first_pos = 0 if first_wf not in pos else pos[first_wf]
72                 second_pos = 0 if adj_wf not in pos else pos[adj_wf]
73                 if (first_pos > second_pos):
74                     raise RuntimeError("Workflows should not have a cycle")
75
76     def _verify_workflow(self, json_contents):
77         # workflow file should have WORKFLOWS_K
78         if not self.WORKFLOWS_K in json_contents:
79             raise ValueError("Not found \"" + self.WORKFLOWS_K +
80                              "\" key in workflow file")
81
82         workflows = json_contents[self.WORKFLOWS_K]
83         # workflow file should have keys listed in WORKFLOWS_K
84         for workflow_k in workflows:
85             if not workflow_k in json_contents:
86                 raise ValueError("Not found " + workflow_k + " key listed in \"" +
87                                  self.WORKFLOWS_K + "\"")
88
89         # each workflow should have either WORKFLOW_STEPS_K or CFG_REFERENCE_K
90         for workflow_k in workflows:
91             if not self.WORKFLOW_STEPS_K in json_contents[workflow_k] and not self.CFG_REFERENCE_K in json_contents[workflow_k]:
92                 raise ValueError("Each workflow should have either \"" +
93                                  self.WORKFLOW_STEPS_K + "\" or \"" +
94                                  self.CFG_REFERENCE_K + "\"")
95         for workflow_k in workflows:
96             if self.WORKFLOW_STEPS_K in json_contents[workflow_k] and self.CFG_REFERENCE_K in json_contents[workflow_k]:
97                 raise ValueError("\"" + self.WORKFLOW_STEPS_K + "\" and \"" +
98                                  self.CFG_REFERENCE_K + "\" are exclusive key")
99
100         # each step should have ONE_CMD_TOOL_K and COMMANDS_K
101         for workflow_k in workflows:
102             workflow = json_contents[workflow_k]
103             if self.WORKFLOW_STEPS_K in workflow:
104                 step_keys = workflow[self.WORKFLOW_STEPS_K]
105                 for step_k in step_keys:
106                     step = workflow[step_k]
107                     if not self.ONE_CMD_TOOL_K in step or not self.COMMANDS_K in step:
108                         raise ValueError("Each step should have \"" +
109                                          self.ONE_CMD_TOOL_K + "\"" + " and \"" +
110                                          self.COMMANDS_K + "\"")
111
112     def run(self, working_dir, verbose=False):
113         # run workflows in sequence
114         for workflow_k in self.workflow_sequence:
115             workflow = self.json_contents[workflow_k]
116             if self.WORKFLOW_STEPS_K in workflow:
117                 steps = workflow[self.WORKFLOW_STEPS_K]
118                 for step_k in steps:
119                     step = workflow[step_k]
120                     commands = step[self.COMMANDS_K]
121                     driver_name = step[self.ONE_CMD_TOOL_K]
122                     option_builder = OptionBuilder(driver_name)
123                     options = option_builder.build(commands)
124                     # get the absolute path of the caller
125                     driver_path = os.path.join(working_dir, driver_name)
126                     cmd = [driver_path] + options
127                     oneutils._run(cmd)
128             elif self.CFG_REFERENCE_K in workflow:
129                 cfg_path = workflow[self.CFG_REFERENCE_K]['path']
130                 runner = CfgRunner(cfg_path)
131                 runner.run(working_dir, verbose)