def read_stats(stats_path, stats: dict):
"""Read statistics from a file and extend provided statistics"""
with open(stats_path, "r") as file:
- parsed_data = yaml.load(file, Loader=yaml.FullLoader)
+ parsed_data = yaml.safe_load(file)
return dict((step_name, stats.get(step_name, []) + [duration])
for step_name, duration in parsed_data.items())
def write_aggregated_stats(stats_path, stats: dict):
"""Write aggregated statistics to a file in YAML format"""
with open(stats_path, "w") as file:
- yaml.dump(stats, file)
+ yaml.safe_dump(stats, file)
def prepare_executable_cmd(args: dict):
from pathlib import Path
import yaml
import hashlib
+from copy import deepcopy
-from test_runner.utils import expand_env_vars, upload_timetest_data, \
+from test_runner.utils import upload_timetest_data, \
DATABASE, DB_COLLECTIONS
default=3
)
# TODO: add support of --mo, --omz etc. required for OMZ support
+ helpers_args_parser = parser.getgroup("test helpers")
+ helpers_args_parser.addoption(
+ "--dump_refs",
+ type=Path,
+ help="path to dump test config with references updated with statistics collected while run",
+ )
db_args_parser = parser.getgroup("timetest database use")
db_args_parser.addoption(
'--db_submit',
parameters.
"""
with open(metafunc.config.getoption('test_conf'), "r") as file:
- test_cases = expand_env_vars(yaml.safe_load(file))
+ test_cases = yaml.safe_load(file)
+ TestConfDumper.fill(test_cases)
if test_cases:
metafunc.parametrize("instance", test_cases)
else:
yield d
- keys = val.keys()
- values = list(get_dict_values(val))
+ keys = ["device", "model"]
+ values = {key: val[key] for key in keys}
+ values = list(get_dict_values(values))
- return "-".join(["_".join([key, val]) for key, val in zip(keys, values)])
+ return "-".join(["_".join([key, str(val)]) for key, val in zip(keys, values)])
@pytest.mark.hookwrapper
else:
data["status"] = "passed"
upload_timetest_data(data, db_url, db_collection)
+
+
+class TestConfDumper:
+ """Class for preparing and dumping new test config with
+ tests' results saved as references
+
+ While run, every test case is patched with it's execution results.
+ To dump new test config, need to add these results to original records
+ as references."""
+ orig_cases = []
+ patched_cases = []
+
+ @classmethod
+ def fill(cls, test_cases: list):
+ """Fill internal fields"""
+ cls.orig_cases = deepcopy(test_cases)
+ cls.patched_cases = test_cases # don't deepcopy() to allow cases' patching while test run
+
+ @classmethod
+ def dump(cls, path):
+ """Dump tests' cases with new references to a file"""
+ assert len(cls.orig_cases) == len(cls.patched_cases), \
+ "Number of patched cases ('{}') isn't equal to original number ('{}')"\
+ .format(len(cls.patched_cases), len(cls.orig_cases))
+ for orig_rec, patched_rec in zip(cls.orig_cases, cls.patched_cases):
+ assert all([orig_rec[key] == patched_rec[key] for key in orig_rec]), \
+ "Can't map original record to a patched record." \
+ " Dump of test config with updated references is skipped"
+ orig_rec["references"] = patched_rec.get("results", {})
+ with open(path, "w") as tconf:
+ yaml.safe_dump(cls.orig_cases, tconf)
+
+
+def pytest_sessionfinish(session):
+ """Pytest hook for session finish."""
+ new_tconf_path = session.config.getoption('dump_refs')
+ if new_tconf_path:
+ TestConfDumper.dump(new_tconf_path)
import logging
from scripts.run_timetest import run_timetest
+from test_runner.utils import expand_env_vars
+
+REFS_FACTOR = 1.2 # 120%
def test_timetest(instance, executable, niter):
# Run executable
exe_args = {
"executable": Path(executable),
- "model": Path(model_path),
+ "model": Path(expand_env_vars(model_path)),
"device": instance["device"]["name"],
"niter": niter
}
retcode, aggr_stats = run_timetest(exe_args, log=logging)
assert retcode == 0, "Run of executable failed"
- instance["results"] = aggr_stats # append values to report to DB
+ # Add timetest results to submit to database and save in new test conf as references
+ instance["results"] = aggr_stats
+
+ # Compare with references
+ comparison_status = 0
+ for step_name, references in instance["references"].items():
+ for metric, reference_val in references.items():
+ if aggr_stats[step_name][metric] > reference_val * REFS_FACTOR:
+ logging.error("Comparison failed for '{}' step for '{}' metric. Reference: {}. Current values: {}"
+ .format(step_name, metric, reference_val, aggr_stats[step_name][metric]))
+ comparison_status = 1
+ else:
+ logging.info("Comparison passed for '{}' step for '{}' metric. Reference: {}. Current values: {}"
+ .format(step_name, metric, reference_val, aggr_stats[step_name][metric]))
+
+ assert comparison_status == 0, "Comparison with references failed"
+