3 # Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
20 from onelib.OptionBuilder import OptionBuilder
21 from onelib.TopologicalSortHelper import TopologicalSortHelper
22 from onelib.CfgRunner import CfgRunner
23 import utils as oneutils
27 WORKFLOWS_K = 'workflows'
28 DEPENDENCIES_K = 'run-after'
29 CFG_REFERENCE_K = 'cfg-reference'
30 WORKFLOW_STEPS_K = 'steps'
31 ONE_CMD_TOOL_K = 'one-cmd'
32 COMMANDS_K = 'commands'
34 def __init__(self, path):
37 self.json_contents = json.load(f)
38 except FileNotFoundError:
39 raise FileNotFoundError("Not found given workflow file")
40 except json.decoder.JSONDecodeError:
41 raise ImportError("Invalid workflow file")
43 self._verify_workflow(self.json_contents)
45 workflows = self.json_contents[self.WORKFLOWS_K]
46 self.adj = dict.fromkeys(workflows, [])
47 # decide the order according to the dependencies of each workflow.
48 helper = TopologicalSortHelper(workflows)
49 for workflow_k in workflows:
50 workflow = self.json_contents[workflow_k]
51 if self.DEPENDENCIES_K in workflow:
52 for previous_workflow in workflow[self.DEPENDENCIES_K]:
53 helper.add_edge(previous_workflow, workflow_k)
54 self.adj[previous_workflow].append(workflow_k)
55 self.workflow_sequence = helper.sort()
59 def _check_cycle(self):
62 workflow_num = len(self.workflow_sequence)
64 for seq_idx in range(workflow_num):
65 pos[self.workflow_sequence[seq_idx]] = index
68 for seq_idx in range(workflow_num):
69 first_wf = self.workflow_sequence[seq_idx]
70 for adj_wf in self.adj[first_wf]:
71 first_pos = 0 if first_wf not in pos else pos[first_wf]
72 second_pos = 0 if adj_wf not in pos else pos[adj_wf]
73 if (first_pos > second_pos):
74 raise RuntimeError("Workflows should not have a cycle")
76 def _verify_workflow(self, json_contents):
77 # workflow file should have WORKFLOWS_K
78 if not self.WORKFLOWS_K in json_contents:
79 raise ValueError("Not found \"" + self.WORKFLOWS_K +
80 "\" key in workflow file")
82 workflows = json_contents[self.WORKFLOWS_K]
83 # workflow file should have keys listed in WORKFLOWS_K
84 for workflow_k in workflows:
85 if not workflow_k in json_contents:
86 raise ValueError("Not found " + workflow_k + " key listed in \"" +
87 self.WORKFLOWS_K + "\"")
89 # each workflow should have either WORKFLOW_STEPS_K or CFG_REFERENCE_K
90 for workflow_k in workflows:
91 if not self.WORKFLOW_STEPS_K in json_contents[workflow_k] and not self.CFG_REFERENCE_K in json_contents[workflow_k]:
92 raise ValueError("Each workflow should have either \"" +
93 self.WORKFLOW_STEPS_K + "\" or \"" +
94 self.CFG_REFERENCE_K + "\"")
95 for workflow_k in workflows:
96 if self.WORKFLOW_STEPS_K in json_contents[workflow_k] and self.CFG_REFERENCE_K in json_contents[workflow_k]:
97 raise ValueError("\"" + self.WORKFLOW_STEPS_K + "\" and \"" +
98 self.CFG_REFERENCE_K + "\" are exclusive key")
100 # each step should have ONE_CMD_TOOL_K and COMMANDS_K
101 for workflow_k in workflows:
102 workflow = json_contents[workflow_k]
103 if self.WORKFLOW_STEPS_K in workflow:
104 step_keys = workflow[self.WORKFLOW_STEPS_K]
105 for step_k in step_keys:
106 step = workflow[step_k]
107 if not self.ONE_CMD_TOOL_K in step or not self.COMMANDS_K in step:
108 raise ValueError("Each step should have \"" +
109 self.ONE_CMD_TOOL_K + "\"" + " and \"" +
110 self.COMMANDS_K + "\"")
112 def run(self, working_dir, verbose=False):
113 # run workflows in sequence
114 for workflow_k in self.workflow_sequence:
115 workflow = self.json_contents[workflow_k]
116 if self.WORKFLOW_STEPS_K in workflow:
117 steps = workflow[self.WORKFLOW_STEPS_K]
119 step = workflow[step_k]
120 commands = step[self.COMMANDS_K]
121 driver_name = step[self.ONE_CMD_TOOL_K]
122 option_builder = OptionBuilder(driver_name)
123 options = option_builder.build(commands)
124 # get the absolute path of the caller
125 driver_path = os.path.join(working_dir, driver_name)
126 cmd = [driver_path] + options
128 elif self.CFG_REFERENCE_K in workflow:
129 cfg_path = workflow[self.CFG_REFERENCE_K]['path']
130 runner = CfgRunner(cfg_path)
131 runner.run(working_dir, verbose)