# How many attempts should be made when a timeout happen during LAVA device boot.
NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 3))
+# Helper constants to colorize the job output
+CONSOLE_LOG_COLOR_GREEN = "\x1b[1;32;5;197m"
+CONSOLE_LOG_COLOR_RED = "\x1b[1;38;5;197m"
+CONSOLE_LOG_COLOR_RESET = "\x1b[0m"
+
def print_log(msg):
print("{}: {}".format(datetime.now(), msg))
+
def fatal_err(msg):
print_log(msg)
sys.exit(1)
class LAVAJob:
+ color_status_map = {"pass": CONSOLE_LOG_COLOR_GREEN}
+
def __init__(self, proxy, definition):
self.job_id = None
self.proxy = proxy
self.last_log_line = 0
self.last_log_time = None
self.is_finished = False
+ self.status = "created"
def heartbeat(self):
self.last_log_time = datetime.now()
+ self.status = "running"
def validate(self) -> Optional[dict]:
"""Returns a dict with errors, if the validation fails.
f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
) from mesa_ci_err
+ def parse_job_result_from_log(self, lava_lines: list[dict[str, str]]) -> None:
+ """Use the console log to catch if the job has completed successfully or
+ not.
+ Returns true only the job finished by looking into the log result
+ parsing.
+ """
+ log_lines = [l["msg"] for l in lava_lines if l["lvl"] == "target"]
+ for line in log_lines:
+ if result := re.search(r"hwci: mesa: (\S*)", line):
+ self.is_finished = True
+ self.status = result.group(1)
+ color = LAVAJob.color_status_map.get(self.status, CONSOLE_LOG_COLOR_RED)
+ print_log(
+ f"{color}"
+ f"LAVA Job finished with result: {self.status}"
+ f"{CONSOLE_LOG_COLOR_RESET}"
+ )
+
+ # We reached the log end here. hwci script has finished.
+ break
+
def find_exception_from_metadata(metadata, job_id):
if "result" not in metadata or metadata["result"] != "fail":
return metadata
-def get_job_results(proxy, job_id, test_suite):
+def find_lava_error(job) -> None:
# Look for infrastructure errors and retry if we see them.
- results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
+ results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
results = yaml.load(results_yaml, Loader=loader(False))
for res in results:
metadata = res["metadata"]
- find_exception_from_metadata(metadata, job_id)
+ find_exception_from_metadata(metadata, job.job_id)
- results_yaml = _call_proxy(
- proxy.results.get_testsuite_results_yaml, job_id, test_suite
- )
- results: list = yaml.load(results_yaml, Loader=loader(False))
- if not results:
- raise MesaCIException(
- f"LAVA: no result for test_suite '{test_suite}'"
- )
-
- for metadata in results:
- test_case = metadata["name"]
- result = metadata["metadata"]["result"]
- print_log(
- f"LAVA: result for test_suite '{test_suite}', "
- f"test_case '{test_case}': {result}"
- )
- if result != "pass":
- return False
-
- return True
+ # If we reach this far, it means that the job ended without hwci script
+ # result and no LAVA infrastructure problem was found
+ job.status = "fail"
def show_job_data(job):
if line["lvl"] in ["results", "feedback"]:
continue
elif line["lvl"] in ["warning", "error"]:
- prefix = "\x1b[1;38;5;197m"
- suffix = "\x1b[0m"
+ prefix = CONSOLE_LOG_COLOR_RED
+ suffix = CONSOLE_LOG_COLOR_RESET
elif line["lvl"] == "input":
prefix = "$ "
suffix = ""
return parsed_lines
-def fetch_logs(job, max_idle_time):
+def fetch_logs(job, max_idle_time) -> None:
# Poll to check for new logs, assuming that a prolonged period of
# silence means that the device has died and we should try it again
if datetime.now() - job.last_log_time > max_idle_time:
# Retry the log fetching several times before exposing the error.
for _ in range(5):
with contextlib.suppress(MesaCIParseException):
- new_lines = job.get_logs()
+ new_log_lines = job.get_logs()
break
else:
raise MesaCIParseException
- parsed_lines = parse_lava_lines(new_lines)
+ parsed_lines = parse_lava_lines(new_log_lines)
for line in parsed_lines:
print_log(line)
+ job.parse_job_result_from_log(new_log_lines)
+
def follow_job_execution(job):
try:
fetch_logs(job, max_idle_time)
show_job_data(job)
- return get_job_results(job.proxy, job.job_id, "0_mesa")
+
+ # Mesa Developers expect to have a simple pass/fail job result.
+ # If this does not happen, it probably means a LAVA infrastructure error
+ # happened.
+ if job.status not in ["pass", "fail"]:
+ find_lava_error(job)
-def retriable_follow_job(proxy, job_definition):
+def retriable_follow_job(proxy, job_definition) -> LAVAJob:
retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
for attempt_no in range(1, retry_count + 2):
job = LAVAJob(proxy, job_definition)
try:
- return follow_job_execution(job)
+ follow_job_execution(job)
+ return job
except MesaCIException as mesa_exception:
print_log(mesa_exception)
job.cancel()
if args.validate_only:
return
- has_job_passed = retriable_follow_job(proxy, job_definition)
- exit_code = 0 if has_job_passed else 1
+ finished_job = retriable_follow_job(proxy, job_definition)
+ exit_code = 0 if finished_job.status == "pass" else 1
sys.exit(exit_code)
NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
LAVAJob,
follow_job_execution,
- get_job_results,
hide_sensitive_data,
retriable_follow_job,
)
NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
-def jobs_logs_response(finished=False, msg=None, lvl="target") -> Tuple[bool, str]:
+def jobs_logs_response(finished=False, msg=None, lvl="target", result=None) -> Tuple[bool, str]:
timed_msg = {"dt": str(datetime.now()), "msg": "New message", "lvl": lvl}
+ if result:
+ timed_msg["lvl"] = "target"
+ timed_msg["msg"] = f"hwci: mesa: {result}"
+
logs = [timed_msg] if msg is None else msg
return finished, yaml.safe_dump(logs)
# Tests all known levels by default
yield from cycle(( "results", "feedback", "warning", "error", "debug", "target" ))
-def generate_n_logs(n=1, tick_fn: Union[Generator, Iterable[int], int]=1, level_fn=level_generator):
+def generate_n_logs(n=1, tick_fn: Union[Generator, Iterable[int], int]=1, level_fn=level_generator, result="pass"):
"""Simulate a log partitionated in n components"""
level_gen = level_fn()
yield jobs_logs_response(finished=False, msg=[], lvl=level)
time_travel.tick(tick_sec)
- yield jobs_logs_response(finished=True)
+ yield jobs_logs_response(finished=True, result=result)
NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {})
},
),
"no retries, but testsuite fails": (
- generate_n_logs(n=1, tick_fn=0),
+ generate_n_logs(n=1, tick_fn=0, result="fail"),
does_not_raise(),
False,
{
},
),
"no retries, one testsuite fails": (
- generate_n_logs(n=1, tick_fn=0),
+ generate_n_logs(n=1, tick_fn=0, result="fail"),
does_not_raise(),
False,
{
),
# If a protocol error happens, _call_proxy will retry without affecting timeouts
"unstable connection, ProtocolError followed by final message": (
- (NETWORK_EXCEPTION, jobs_logs_response(finished=True)),
+ (NETWORK_EXCEPTION, jobs_logs_response(finished=True, result="pass")),
does_not_raise(),
True,
{},
):
with expectation:
proxy = mock_proxy(side_effect=side_effect, **proxy_args)
- result = retriable_follow_job(proxy, "")
- assert job_result == result
+ job: LAVAJob = retriable_follow_job(proxy, "")
+ assert job_result == (job.status == "pass")
WAIT_FOR_JOB_SCENARIOS = {
mock_proxy_waiting_time,
):
start_time = datetime.now()
- result = retriable_follow_job(
+ job: LAVAJob = retriable_follow_job(
mock_proxy_waiting_time(
frozen_time, side_effect=side_effect, wait_time=wait_time
),
end_time = datetime.now()
delta_time = end_time - start_time
- assert has_finished == result
+ assert has_finished == (job.status == "pass")
assert delta_time.total_seconds() >= wait_time
assert result == expectation
-def test_get_job_results(mock_proxy):
- proxy = mock_proxy()
- get_job_results(proxy, 1, "0_mesa")
-
-
CORRUPTED_LOG_SCENARIOS = {
"too much subsequent corrupted data": (
[(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)],