* `--niter` - Number of times to run executable.
"""
+import hashlib
+import json
+import logging
# pylint:disable=import-error
import os
+import shutil
import sys
-import pytest
+import tempfile
from pathlib import Path
+
+import pytest
import yaml
-import hashlib
-import shutil
-import logging
-import tempfile
from jsonschema import validate, ValidationError
-from test_runner.utils import upload_timetest_data, \
- DATABASE, DB_COLLECTIONS
from scripts.run_timetest import check_positive_int
+from test_runner.utils import upload_timetest_data, metadata_from_manifest, get_os_name, get_os_version, \
+ DATABASE, DB_COLLECTIONS
# -------------------- CLI options --------------------
help='collection name in "{}" database'.format(DATABASE),
choices=DB_COLLECTIONS
)
+ db_args_parser.addoption(
+ '--db_metadata',
+ type=str,
+ default="{}",
+ help='add extra commit information, json formatted')
+ db_args_parser.addoption(
+ '--manifest',
+ type=Path,
+ required=is_db_used,
+ help='extract commit information from build manifest')
@pytest.fixture(scope="session")
"""Fixture function for command-line option."""
return request.config.getoption('niter')
+
# -------------------- CLI options --------------------
internal purposes.
"""
setattr(request.node._request, "test_info", {"orig_instance": request.node.funcargs["instance"],
- "results": {}})
+ "results": {},
+ "db_info": {}})
if not hasattr(pytestconfig, "session_info"):
setattr(pytestconfig, "session_info", [])
"""Fixture for validating test case on correctness.
Fixture checks current test case contains all fields required for
- a correct work. To submit results to a database test case have
- contain several additional properties.
+ a correct work.
"""
- schema = {
+ schema = """
+ {
"type": "object",
"properties": {
"device": {
"type": "object",
"properties": {
"name": {"type": "string"}
- }},
+ },
+ "required": ["name"]
+ },
"model": {
"type": "object",
"properties": {
"path": {"type": "string"}
- }},
+ },
+ "required": ["path"]
+ }
},
+ "required": ["device", "model"],
+ "additionalProperties": false
}
- if request.config.getoption("db_submit"):
- # For submission data to a database some additional fields are required
- schema["properties"]["model"]["properties"].update({
- "name": {"type": "string"},
- "precision": {"type": "string"},
- "framework": {"type": "string"}
- })
- test_info["submit_to_db"] = True
+ """
+ schema = json.loads(schema)
+
try:
validate(instance=request.node.funcargs["instance"], schema=schema)
except ValidationError:
- test_info["submit_to_db"] = False
+ request.config.option.db_submit = False
raise
yield
+@pytest.fixture(scope="function")
+def prepare_db_info(request, test_info, executable, niter, manifest_metadata):
+ """Fixture for preparing and validating data to submit to a database.
+
+ Fixture prepares data and metadata to submit to a database. One of the steps
+ is parsing of build information from build manifest. After preparation,
+ it checks if data contains required properties.
+ """
+ FIELDS_FOR_ID = ['run_id', 'timetest', 'model', 'device', 'niter']
+
+ run_id = request.config.getoption("db_submit")
+ if not run_id:
+ yield
+ return
+
+ # add db_metadata
+ test_info["db_info"].update(json.loads(request.config.getoption("db_metadata")))
+
+ # add test info
+ info = {
+ "run_id": run_id,
+ "timetest": str(executable.stem),
+ "model": request.node.funcargs["instance"]["model"],
+ "device": request.node.funcargs["instance"]["device"],
+ "niter": niter,
+ "test_name": request.node.name,
+ "results": test_info["results"],
+ "os": "_".join([str(item) for item in [get_os_name(), *get_os_version()]])
+ }
+ info['_id'] = hashlib.sha256(
+ ''.join([str(info[key]) for key in FIELDS_FOR_ID]).encode()).hexdigest()
+ test_info["db_info"].update(info)
+
+ # add manifest metadata
+ test_info["db_info"].update(manifest_metadata)
+
+ # validate db_info
+ schema = """
+ {
+ "type": "object",
+ "properties": {
+ "device": {
+ "type": "object",
+ "properties": {
+ "name": {"type": "string"}
+ },
+ "required": ["name"]
+ },
+ "model": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "name": {"type": "string"},
+ "precision": {"type": "string"},
+ "framework": {"type": "string"}
+ },
+ "required": ["path", "name", "precision", "framework"]
+ },
+ "run_id": {"type": "string"},
+ "timetest": {"type": "string"},
+ "niter": {"type": "integer"},
+ "test_name": {"type": "string"},
+ "results": {"type": "object"},
+ "os": {"type": "string"},
+ "_id": {"type": "string"}
+ },
+ "required": ["device", "model", "run_id", "timetest", "niter", "test_name", "results", "os", "_id"],
+ "additionalProperties": true
+ }
+ """
+ schema = json.loads(schema)
+
+ try:
+ validate(instance=test_info["db_info"], schema=schema)
+ except ValidationError:
+ request.config.option.db_submit = False
+ raise
+ yield
+
+
+@pytest.fixture(scope="session", autouse=True)
+def manifest_metadata(request):
+ """Fixture function for command-line option."""
+
+ run_id = request.config.getoption("db_submit")
+ if not run_id:
+ yield
+ return
+
+ manifest_meta = metadata_from_manifest(request.config.getoption("manifest"))
+
+ schema = """
+ {
+ "type": "object",
+ "properties": {
+ "product_type": {"enum": ["private_linux_ubuntu_18_04", "private_windows_vs2019"]},
+ "repo_url": {"type": "string"},
+ "commit_sha": {"type": "string"},
+ "commit_date": {"type": "string"},
+ "target_branch": {"type": "string"},
+ "version": {"type": "string"}
+ },
+ "required": ["product_type", "repo_url", "commit_sha", "commit_date", "target_branch", "version"],
+ "additionalProperties": false
+ }
+ """
+ schema = json.loads(schema)
+
+ try:
+ validate(instance=manifest_meta, schema=schema)
+ except ValidationError:
+ request.config.option.db_submit = False
+ raise
+ yield manifest_meta
+
+
@pytest.fixture(scope="session", autouse=True)
def prepare_tconf_with_refs(pytestconfig):
"""Fixture for preparing test config based on original test config
Submit tests' data to a database.
"""
- FIELDS_FOR_ID = ['timetest', 'model', 'device', 'niter', 'run_id']
- FIELDS_FOR_SUBMIT = FIELDS_FOR_ID + ['_id', 'test_name',
- 'results', 'status', 'error_msg']
-
run_id = item.config.getoption("db_submit")
- db_url = item.config.getoption("db_url")
- db_collection = item.config.getoption("db_collection")
- if not (run_id and db_url and db_collection):
- yield
- return
- if not item._request.test_info["submit_to_db"]:
- logging.error("Data won't be uploaded to a database on '{}' step".format(call.when))
+ if not run_id:
yield
return
- data = item.funcargs.copy()
- data["timetest"] = data.pop("executable").stem
- data.update(data["instance"])
-
- data['run_id'] = run_id
- data['_id'] = hashlib.sha256(
- ''.join([str(data[key]) for key in FIELDS_FOR_ID]).encode()).hexdigest()
-
- data["test_name"] = item.name
- data["results"] = item._request.test_info["results"]
+ data = item._request.test_info["db_info"].copy()
data["status"] = "not_finished"
data["error_msg"] = ""
- data = {field: data[field] for field in FIELDS_FOR_SUBMIT}
-
report = (yield).get_result()
if call.when in ["setup", "call"]:
if call.when == "call":
data["error_msg"] = report.longrepr.reprcrash.message
else:
data["status"] = "passed"
+
+ db_url = item.config.getoption("db_url")
+ db_collection = item.config.getoption("db_collection")
logging.info("Upload data to {}/{}.{}. Data: {}".format(db_url, DATABASE, db_collection, data))
upload_timetest_data(data, db_url, db_collection)
"""Utility module."""
import os
+import platform
+import sys
+from enum import Enum
+from pathlib import Path
+
+import distro
+import yaml
from pymongo import MongoClient
# constants
DATABASE = 'timetests' # database name for timetests results
DB_COLLECTIONS = ["commit", "nightly", "weekly"]
+PRODUCT_NAME = 'dldt' # product name from build manifest
def expand_env_vars(obj):
client = MongoClient(db_url)
collection = client[DATABASE][db_collection]
collection.replace_one({'_id': data['_id']}, data, upsert=True)
+
+
+def metadata_from_manifest(manifest: Path):
+ """ Extract commit metadata from manifest
+ """
+ with open(manifest, 'r') as manifest_file:
+ manifest = yaml.safe_load(manifest_file)
+ repo_trigger = next(
+ repo for repo in manifest['components'][PRODUCT_NAME]['repository'] if repo['trigger'])
+ return {
+ 'product_type': manifest['components'][PRODUCT_NAME]['product_type'],
+ 'commit_sha': repo_trigger['revision'],
+ 'commit_date': repo_trigger['commit_time'],
+ 'repo_url': repo_trigger['url'],
+ 'target_branch': repo_trigger['target_branch'],
+ 'version': manifest['components'][PRODUCT_NAME]['version']
+ }
+
+
+class UnsupportedOsError(Exception):
+ """
+ Exception for unsupported OS type
+ """
+
+ def __init__(self, *args, **kwargs):
+ error_message = f'OS type "{get_os_type()}" is not currently supported'
+ if args or kwargs:
+ super().__init__(*args, **kwargs)
+ else:
+ super().__init__(error_message)
+
+
+class OsType(Enum):
+ """
+ Container for supported os types
+ """
+ WINDOWS = 'Windows'
+ LINUX = 'Linux'
+ DARWIN = 'Darwin'
+
+
+def get_os_type():
+ """
+ Get OS type
+
+ :return: OS type
+ :rtype: String | Exception if it is not supported
+ """
+ return platform.system()
+
+
+def os_type_is_windows():
+ """Returns True if OS type is Windows. Otherwise returns False"""
+ return get_os_type() == OsType.WINDOWS.value
+
+
+def os_type_is_linux():
+ """Returns True if OS type is Linux. Otherwise returns False"""
+ return get_os_type() == OsType.LINUX.value
+
+
+def os_type_is_darwin():
+ """Returns True if OS type is Darwin. Otherwise returns False"""
+ return get_os_type() == OsType.DARWIN.value
+
+
+def get_os_name():
+ """
+ Check OS type and return OS name
+
+ :return: OS name
+ :rtype: String | Exception if it is not supported
+ """
+ if os_type_is_linux():
+ return distro.id().lower()
+ if os_type_is_windows() or os_type_is_darwin():
+ return get_os_type().lower()
+ raise UnsupportedOsError()
+
+
+def get_os_version():
+ """
+ Check OS version and return it
+
+ :return: OS version
+ :rtype: tuple | Exception if it is not supported
+ """
+ if os_type_is_linux():
+ return distro.major_version(), distro.minor_version()
+ if os_type_is_windows():
+ return sys.getwindowsversion().major, sys.getwindowsversion().minor
+ if os_type_is_darwin():
+ return tuple(platform.mac_ver()[0].split(".")[:2])
+ raise UnsupportedOsError()