from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from io import StringIO
-from os import getenv
+from os import environ, getenv, path
from typing import Any, Optional
import fire
def execute_job_with_retries(
proxy, job_definition, retry_count, jobs_log
) -> Optional[LAVAJob]:
+ last_failed_job = None
for attempt_no in range(1, retry_count + 2):
# Need to get the logger value from its object to enable autosave
# features, if AutoSaveDict is enabled from StructuredLogging module
job = LAVAJob(proxy, job_definition, job_log)
STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
try:
+ job_log["submitter_start_time"] = datetime.now().isoformat()
submit_job(job)
wait_for_job_get_started(job)
log_follower: LogFollower = bootstrap_log_follower()
follow_job_execution(job, log_follower)
return job
+
except (MesaCIException, KeyboardInterrupt) as exception:
job.handle_exception(exception)
+
+ finally:
+ print_job_final_status(job)
+ # If LAVA takes too long to post process the job, the submitter
+ # gives up and proceeds.
+ job_log["submitter_end_time"] = datetime.now().isoformat()
+ last_failed_job = job
print_log(
f"{CONSOLE_LOG['BOLD']}"
f"Finished executing LAVA job in the attempt #{attempt_no}"
f"{CONSOLE_LOG['RESET']}"
)
- finally:
- job_log["finished_time"] = datetime.now().isoformat()
- print_job_final_status(job)
+
+ return last_failed_job
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
- if finished_job := execute_job_with_retries(
+ last_attempted_job = execute_job_with_retries(
proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
- ):
- return finished_job
-
- # Job failed in all attempts
- raise MesaCIRetryError(
- f"{CONSOLE_LOG['BOLD']}"
- f"{CONSOLE_LOG['FG_RED']}"
- "Job failed after it exceeded the number of "
- f"{number_of_retries} retries."
- f"{CONSOLE_LOG['RESET']}",
- retry_count=number_of_retries,
)
+ if last_attempted_job.exception is not None:
+ # Infra failed in all attempts
+ raise MesaCIRetryError(
+ f"{CONSOLE_LOG['BOLD']}"
+ f"{CONSOLE_LOG['FG_RED']}"
+ "Job failed after it exceeded the number of "
+ f"{number_of_retries} retries."
+ f"{CONSOLE_LOG['RESET']}",
+ retry_count=number_of_retries,
+ last_job=last_attempted_job,
+ )
+
+ return last_attempted_job
+
@dataclass
class PathResolver:
return
self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
+ self.proxy = setup_lava_proxy()
- def dump(self, job_definition):
- if self.dump_yaml:
- with GitlabSection(
- "yaml_dump",
- "LAVA job definition (YAML)",
- type=LogSectionType.LAVA_BOOT,
- start_collapsed=True,
- ):
- print(hide_sensitive_data(job_definition))
-
- def submit(self):
- proxy = setup_lava_proxy()
-
+ def __prepare_submission(self) -> str:
# Overwrite the timeout for the testcases with the value offered by the
# user. The testcase running time should be at least 4 times greater than
# the other sections (boot and setup), so we can safely ignore them.
lava_yaml.dump(generate_lava_yaml_payload(self), job_definition_stream)
job_definition = job_definition_stream.getvalue()
- self.dump(job_definition)
+ if self.dump_yaml:
+ self.dump_job_definition(job_definition)
- job = LAVAJob(proxy, job_definition)
- if errors := job.validate():
+ validation_job = LAVAJob(self.proxy, job_definition)
+ if errors := validation_job.validate():
fatal_err(f"Error in LAVA job definition: {errors}")
print_log("LAVA job definition validated successfully")
+ return job_definition
+
+ @classmethod
+ def is_under_ci(cls):
+ ci_envvar: str = getenv("CI", "false")
+ return ci_envvar.lower() == "true"
+
+ def dump_job_definition(self, job_definition) -> None:
+ with GitlabSection(
+ "yaml_dump",
+ "LAVA job definition (YAML)",
+ type=LogSectionType.LAVA_BOOT,
+ start_collapsed=True,
+ ):
+ print(hide_sensitive_data(job_definition))
+
+ def submit(self) -> None:
+ """
+ Prepares and submits the LAVA job.
+ If `validate_only` is True, it validates the job without submitting it.
+ If the job finishes with a non-pass status or encounters an exception,
+ the program exits with a non-zero return code.
+ """
+ job_definition: str = self.__prepare_submission()
+
if self.validate_only:
return
with self.__structured_log_context:
+ last_attempt_job = None
try:
- finished_job = retriable_follow_job(proxy, job_definition)
- exit_code = 0 if finished_job.status == "pass" else 1
- STRUCTURAL_LOG["job_combined_status"] = job.status
- sys.exit(exit_code)
+ last_attempt_job = retriable_follow_job(self.proxy, job_definition)
+
+ except MesaCIRetryError as retry_exception:
+ last_attempt_job = retry_exception.last_job
except Exception as exception:
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
raise exception
+ finally:
+ self.finish_script(last_attempt_job)
+
+ def print_log_artifact_url(self):
+ base_url = "https://$CI_PROJECT_ROOT_NAMESPACE.pages.freedesktop.org/"
+ artifacts_path = "-/$CI_PROJECT_NAME/-/jobs/$CI_JOB_ID/artifacts/"
+ relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd())
+ full_path = f"{base_url}{artifacts_path}{relative_log_path}"
+ artifact_url = path.expandvars(full_path)
+
+ print_log(f"Structural Logging data available at: {artifact_url}")
+
+ def finish_script(self, last_attempt_job):
+ if self.is_under_ci() and self.structured_log_file:
+ self.print_log_artifact_url()
+
+ if not last_attempt_job:
+ # No job was run, something bad happened
+ STRUCTURAL_LOG["job_combined_status"] = "script_crash"
+ current_exception = str(sys.exc_info()[0])
+ STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
+ raise SystemExit(1)
+
+ STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
+
+ if last_attempt_job.status != "pass":
+ raise SystemExit(1)
class StructuredLoggerWrapper:
def __init__(self, submitter: LAVAJobSubmitter) -> None:
self._is_finished = False
self.log: dict[str, Any] = log
self.status = "not_submitted"
+ self.__exception: Optional[str] = None
def heartbeat(self) -> None:
self.last_log_time: datetime = datetime.now()
def is_finished(self) -> bool:
return self._is_finished
+ @property
+ def exception(self) -> str:
+ return self.__exception
+
+ @exception.setter
+ def exception(self, exception: Exception) -> None:
+ self.__exception = repr(exception)
+ self.log["dut_job_fail_reason"] = self.__exception
+
def validate(self) -> Optional[dict]:
"""Returns a dict with errors, if the validation fails.
def handle_exception(self, exception: Exception):
print_log(exception)
+ self.cancel()
+ self.exception = exception
+
+ # Give more accurate status depending on exception
if isinstance(exception, MesaCIKnownIssueException):
self.status = "canceled"
elif isinstance(exception, MesaCITimeoutError):
elif isinstance(exception, MesaCIException):
self.status = "failed"
elif isinstance(exception, KeyboardInterrupt):
- self.status = "canceled_by_user"
+ self.status = "interrupted"
print_log("LAVA job submitter was interrupted. Cancelling the job.")
- raise exception
+ raise
else:
self.status = "job_submitter_error"
-
- self.cancel()
- self.log["dut_job_fail_reason"] = str(exception)
#
# SPDX-License-Identifier: MIT
+import os
import xmlrpc.client
from contextlib import nullcontext as does_not_raise
from datetime import datetime
from itertools import chain, repeat
from pathlib import Path
+from unittest.mock import MagicMock, patch
import pytest
from lava.exceptions import MesaCIException, MesaCIRetryError
return update_mock_proxy
+@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"])
+def ci_environment(request):
+ with patch.dict(os.environ, request.param):
+ yield
+
+
+@pytest.fixture
+def lava_job_submitter(
+ ci_environment,
+ tmp_path,
+ mock_proxy,
+):
+ os.chdir(tmp_path)
+ tmp_file = Path(tmp_path) / "log.json"
+
+ with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy:
+ mock_setup_lava_proxy.return_value = mock_proxy()
+ yield LAVAJobSubmitter(
+ boot_method="test_boot",
+ ci_project_dir="test_dir",
+ device_type="test_device",
+ job_timeout_min=1,
+ structured_log_file=tmp_file,
+ )
+
+
@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
with pytest.raises(MesaCIException):
@pytest.mark.slow(
reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml"
)
-def test_full_yaml_log(mock_proxy, frozen_time, tmp_path):
+def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter):
import random
from lavacli.utils import flow_yaml as lava_yaml
def reset_logs(*args):
proxy.scheduler.jobs.logs.side_effect = load_lines()
- tmp_file = Path(tmp_path) / "log.json"
- LAVAJobSubmitter(structured_log_file=tmp_file)
proxy.scheduler.jobs.submit = reset_logs
with pytest.raises(MesaCIRetryError):
time_travel_to_test_time()
+ lava_job_submitter.submit()
retriable_follow_job(proxy, "")
- print(tmp_file.read_text())
+ print(lava_job_submitter.structured_log_file.read_text())
+
+
+@pytest.mark.parametrize(
+ "validate_only,finished_job_status,expected_combined_status,expected_exit_code",
+ [
+ (True, "pass", None, None),
+ (False, "pass", "pass", 0),
+ (False, "fail", "fail", 1),
+ ],
+ ids=[
+ "validate_only_no_job_submission",
+ "successful_job_submission",
+ "failed_job_submission",
+ ],
+)
+def test_job_combined_status(
+ lava_job_submitter,
+ validate_only,
+ finished_job_status,
+ expected_combined_status,
+ expected_exit_code,
+):
+ lava_job_submitter.validate_only = validate_only
+
+ with patch(
+ "lava.lava_job_submitter.retriable_follow_job"
+ ) as mock_retriable_follow_job, patch(
+ "lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission"
+ ) as mock_prepare_submission, patch(
+ "sys.exit"
+ ):
+ from lava.lava_job_submitter import STRUCTURAL_LOG
+
+ mock_retriable_follow_job.return_value = MagicMock(status=finished_job_status)
+
+ mock_job_definition = MagicMock(spec=str)
+ mock_prepare_submission.return_value = mock_job_definition
+ original_status: str = STRUCTURAL_LOG.get("job_combined_status")
+
+ if validate_only:
+ lava_job_submitter.submit()
+ mock_retriable_follow_job.assert_not_called()
+ assert STRUCTURAL_LOG.get("job_combined_status") == original_status
+ return
+
+ try:
+ lava_job_submitter.submit()
+
+ except SystemExit as e:
+ assert e.code == expected_exit_code
+
+ assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status