"""
# pylint:disable=import-error
+import sys
import pytest
from pathlib import Path
import yaml
+import hashlib
-from test_runner.utils import expand_env_vars
+from test_runner.utils import expand_env_vars, upload_timetest_data, \
+ DATABASE, DB_COLLECTIONS
# -------------------- CLI options --------------------
+
def pytest_addoption(parser):
"""Specify command-line options for all plugins"""
- parser.addoption(
+ test_args_parser = parser.getgroup("timetest test run")
+ test_args_parser.addoption(
"--test_conf",
type=Path,
- help="Path to test config",
+ help="path to a test config",
default=Path(__file__).parent / "test_config.yml"
)
- parser.addoption(
+ test_args_parser.addoption(
"--exe",
required=True,
dest="executable",
type=Path,
- help="Path to a timetest binary to execute",
+ help="path to a timetest binary to execute"
)
- parser.addoption(
+ test_args_parser.addoption(
"--niter",
type=int,
- help="Number of iterations to run executable and aggregate results",
+ help="number of iterations to run executable and aggregate results",
default=3
)
# TODO: add support of --mo, --omz etc. required for OMZ support
+ db_args_parser = parser.getgroup("timetest database use")
+ db_args_parser.addoption(
+ '--db_submit',
+ metavar="RUN_ID",
+ type=str,
+ help='submit results to the database. ' \
+ '`RUN_ID` should be a string uniquely identifying the run' \
+ ' (like Jenkins URL or time)'
+ )
+ is_db_used = db_args_parser.parser.parse_known_args(sys.argv).db_submit
+ db_args_parser.addoption(
+ '--db_url',
+ type=str,
+ required=is_db_used,
+ help='MongoDB URL in a form "mongodb://server:port"'
+ )
+ db_args_parser.addoption(
+ '--db_collection',
+ type=str,
+ required=is_db_used,
+ help='collection name in "{}" database'.format(DATABASE),
+ choices=DB_COLLECTIONS
+ )
@pytest.fixture(scope="session")
values = list(get_dict_values(val))
return "-".join(["_".join([key, val]) for key, val in zip(keys, values)])
+
+
+@pytest.mark.hookwrapper
+def pytest_runtest_makereport(item, call):
+ """Pytest hook for report preparation.
+
+ Submit tests' data to a database.
+ """
+
+ FIELDS_FOR_ID = ['timetest', 'model', 'device', 'niter', 'run_id']
+ FIELDS_FOR_SUBMIT = FIELDS_FOR_ID + ['_id', 'test_name',
+ 'results', 'status', 'error_msg']
+
+ run_id = item.config.getoption("db_submit")
+ db_url = item.config.getoption("db_url")
+ db_collection = item.config.getoption("db_collection")
+ if not (run_id and db_url and db_collection):
+ yield
+ return
+
+ data = item.funcargs.copy()
+ data["timetest"] = data.pop("executable").stem
+ data.update({key: val for key, val in data["instance"].items()})
+
+ data['run_id'] = run_id
+ data['_id'] = hashlib.sha256(
+ ''.join([str(data[key]) for key in FIELDS_FOR_ID]).encode()).hexdigest()
+
+ data["test_name"] = item.name
+ data["results"] = {}
+ data["status"] = "not_finished"
+ data["error_msg"] = ""
+
+ data = {field: data[field] for field in FIELDS_FOR_SUBMIT}
+
+ report = (yield).get_result()
+ if call.when in ["setup", "call"]:
+ if call.when == "call":
+ if not report.passed:
+ data["status"] = "failed"
+ data["error_msg"] = report.longrepr.reprcrash.message
+ else:
+ data["status"] = "passed"
+ upload_timetest_data(data, db_url, db_collection)