import traceback
import urllib.parse
import xmlrpc
-
from datetime import datetime, timedelta
from os import getenv
fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
+class MesaCIException(Exception):
+ pass
+
+
+class LAVAJob():
+ def __init__(self, proxy, definition):
+ self.job_id = None
+ self.proxy = proxy
+ self.definition = definition
+ self.last_log_line = 0
+ self.last_log_time = None
+ self.is_finished = False
+
+ def heartbeat(self):
+ self.last_log_time = datetime.now()
+
+ def validate(self):
+ try:
+ return _call_proxy(
+ self.proxy.scheduler.jobs.validate, self.definition, True
+ )
+ except MesaCIException:
+ return False
+
+ def submit(self):
+ try:
+ self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
+ except MesaCIException:
+ return False
+ return True
+
+ def cancel(self):
+ if self.job_id:
+ self.proxy.scheduler.jobs.cancel(self.job_id)
+
+ def is_started(self):
+ waiting_states = ["Submitted", "Scheduling", "Scheduled"]
+ job_state = _call_proxy(self.proxy.scheduler.job_state, self.job_id)
+ return job_state["job_state"] not in waiting_states
+
+ def get_logs(self):
+ try:
+ (finished, data) = _call_proxy(
+ self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
+ )
+ lines = yaml.load(str(data), Loader=loader(False))
+ self.is_finished = finished
+ if not lines:
+ return []
+ self.heartbeat()
+ self.last_log_line += len(lines)
+ return lines
+ except MesaCIException as mesa_exception:
+ fatal_err(f"Could not get LAVA job logs. Reason: {mesa_exception}")
+
+
def get_job_results(proxy, job_id, test_suite, test_case):
# Look for infrastructure errors and retry if we see them.
results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
if "result" not in metadata or metadata["result"] != "fail":
continue
if 'error_type' in metadata and metadata['error_type'] == "Infrastructure":
- print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
- return False
+ raise MesaCIException("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
if 'case' in metadata and metadata['case'] == "validate":
- print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
- return False
+ raise MesaCIException("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case)
results = yaml.load(results_yaml, Loader=loader(False))
if not results:
- fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
+ raise MesaCIException("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result']))
if results[0]['result'] != 'pass':
- fatal_err("FAIL")
+ return False
return True
-def wait_until_job_is_started(proxy, job_id):
- print_log(f"Waiting for job {job_id} to start.")
- current_state = "Submitted"
- waiting_states = ["Submitted", "Scheduling", "Scheduled"]
- while current_state in waiting_states:
- time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
- job_state = _call_proxy(proxy.scheduler.job_state, job_id)
- current_state = job_state["job_state"]
-
- print_log(f"Job {job_id} started.")
-
-def follow_job_execution(proxy, job_id):
- line_count = 0
- finished = False
- last_time_logs = datetime.now()
- while not finished:
- # `proxy.scheduler.jobs.logs` does not block, even when there is no
- # new log to be fetched. To avoid dosing the LAVA dispatcher
- # machine, let's add a sleep to save them some stamina.
- time.sleep(LOG_POLLING_TIME_SEC)
-
- (finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count)
- if logs := yaml.load(str(data), Loader=loader(False)):
- # Reset the timeout
- last_time_logs = datetime.now()
- for line in logs:
- print("{} {}".format(line["dt"], line["msg"]))
-
- line_count += len(logs)
- else:
- time_limit = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
- if datetime.now() - last_time_logs > time_limit:
- print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id))
- return False
-
- return True
-def show_job_data(proxy, job_id):
- show = _call_proxy(proxy.scheduler.jobs.show, job_id)
+def show_job_data(job):
+ show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
for field, value in show.items():
print("{}\t: {}".format(field, value))
-def validate_job(proxy, job_file):
+def follow_job_execution(job):
try:
- return _call_proxy(proxy.scheduler.jobs.validate, job_file, True)
- except:
- return False
-
-def submit_job(proxy, job_file):
- return _call_proxy(proxy.scheduler.jobs.submit, job_file)
+ job.submit()
+ except MesaCIException as mesa_exception:
+ fatal_err(f"Could not submit LAVA job. Reason: {mesa_exception}")
+ print_log(f"Waiting for job {job.job_id} to start.")
+ while not job.is_started():
+ time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
+ print_log(f"Job {job.job_id} started.")
+
+ max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
+ # Start to check job's health
+ job.heartbeat()
+ while not job.is_finished:
+ # Poll to check for new logs, assuming that a prolonged period of
+ # silence means that the device has died and we should try it again
+ if datetime.now() - job.last_log_time > max_idle_time:
+ print_log(
+ f"No log output for {max_idle_time} seconds; assuming device has died, retrying"
+ )
+
+ raise MesaCIException(
+ f"LAVA job {job.job_id} does not respond for {max_idle_time}. Retry."
+ )
-def retriable_follow_job(proxy, yaml_file):
- retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
-
- while retry_count >= 0:
- job_id = submit_job(proxy, yaml_file)
+ time.sleep(LOG_POLLING_TIME_SEC)
- print_log("LAVA job id: {}".format(job_id))
+ new_lines = job.get_logs()
- wait_until_job_is_started(proxy, job_id)
+ for line in new_lines:
+ print(line)
- if not follow_job_execution(proxy, job_id):
- print_log(f"Job {job_id} has timed out. Cancelling it.")
- # Cancel the job as it is considered unreachable by Mesa CI.
- proxy.scheduler.jobs.cancel(job_id)
+ show_job_data(job)
+ return get_job_results(job.proxy, job.job_id, "0_mesa", "mesa")
- retry_count -= 1
- continue
- show_job_data(proxy, job_id)
-
- if get_job_results(proxy, job_id, "0_mesa", "mesa") == True:
- break
- else:
- # The script attempted all the retries. The job seemed to fail.
- return False
+def retriable_follow_job(proxy, job_definition):
+ retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
- return True
+ for attempt_no in range(1, retry_count + 2):
+ job = LAVAJob(proxy, job_definition)
+ try:
+ return follow_job_execution(job)
+ except MesaCIException as mesa_exception:
+ print_log(mesa_exception)
+ job.cancel()
+ finally:
+ print_log(f"Finished executing LAVA job in the attempt #{attempt_no}")
+
+ fatal_err(
+ "Job failed after it exceeded the number of "
+ f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries."
+ )
def main(args):
proxy = setup_lava_proxy()
- yaml_file = generate_lava_yaml(args)
+ job_definition = generate_lava_yaml(args)
if args.dump_yaml:
- print(hide_sensitive_data(generate_lava_yaml(args)))
+ print("LAVA job definition (YAML):")
+ print(hide_sensitive_data(job_definition))
if args.validate_only:
- ret = validate_job(proxy, yaml_file)
+ job = LAVAJob(proxy, job_definition)
+ ret = job.validate()
if not ret:
fatal_err("Error in LAVA job definition")
print("LAVA job definition validated successfully")
return
- if not retriable_follow_job(proxy, yaml_file):
- fatal_err(
- "Job failed after it exceeded the number of"
- f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries."
- )
+ ret = retriable_follow_job(proxy, job_definition)
+ sys.exit(ret)
def create_parser():
import xmlrpc.client
from contextlib import nullcontext as does_not_raise
from datetime import datetime
-from itertools import repeat
-from typing import Tuple
+from itertools import cycle, repeat
+from typing import Iterable, Union, Generator, Tuple
from unittest.mock import MagicMock, patch
import pytest
import yaml
from freezegun import freeze_time
from lava.lava_job_submitter import (
+ NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
DEVICE_HANGING_TIMEOUT_SEC,
follow_job_execution,
hide_sensitive_data,
retriable_follow_job,
+ LAVAJob
)
+NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
-def jobs_logs_response(finished=False, msg=None) -> Tuple[bool, str]:
- timed_msg = {"dt": str(datetime.now()), "msg": "New message"}
+
+def jobs_logs_response(finished=False, msg=None, lvl="target") -> Tuple[bool, str]:
+ timed_msg = {"dt": str(datetime.now()), "msg": "New message", "lvl": lvl}
logs = [timed_msg] if msg is None else msg
return finished, yaml.safe_dump(logs)
@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
with pytest.raises(exception):
- follow_job_execution(mock_proxy(side_effect=exception), "")
+ proxy = mock_proxy(side_effect=exception)
+ job = LAVAJob(proxy, '')
+ follow_job_execution(job)
+
+def level_generator():
+ # Tests all known levels by default
+ yield from cycle(( "results", "feedback", "warning", "error", "debug", "target" ))
-def generate_n_logs(n=1, tick_sec=1):
+def generate_n_logs(n=1, tick_fn: Union[Generator, Iterable[int], int]=1, level_fn=level_generator):
"""Simulate a log partitionated in n components"""
+ level_gen = level_fn()
+
+ if isinstance(tick_fn, Generator):
+ tick_gen = tick_fn
+ elif isinstance(tick_fn, Iterable):
+ tick_gen = cycle(tick_fn)
+ else:
+ tick_gen = cycle((tick_fn,))
+
with freeze_time(datetime.now()) as time_travel:
+ tick_sec: int = next(tick_gen)
while True:
# Simulate a scenario where the target job is waiting for being started
for _ in range(n - 1):
+ level: str = next(level_gen)
+
time_travel.tick(tick_sec)
- yield jobs_logs_response(finished=False, msg=[])
+ yield jobs_logs_response(finished=False, msg=[], lvl=level)
time_travel.tick(tick_sec)
yield jobs_logs_response(finished=True)
PROXY_SCENARIOS = {
"finish case": (generate_n_logs(1), does_not_raise(), True),
"works at last retry": (
- generate_n_logs(n=3, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1),
+ generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS, tick_fn=[ DEVICE_HANGING_TIMEOUT_SEC + 1 ] * NUMBER_OF_RETRIES_TIMEOUT_DETECTION + [1]),
does_not_raise(),
True,
),
"timed out more times than retry attempts": (
- generate_n_logs(n=4, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1),
- does_not_raise(),
+ generate_n_logs(n=4, tick_fn=DEVICE_HANGING_TIMEOUT_SEC + 1),
+ pytest.raises(SystemExit),
False,
),
"long log case, no silence": (
- generate_n_logs(n=1000, tick_sec=0),
+ generate_n_logs(n=1000, tick_fn=0),
does_not_raise(),
True,
),
"very long silence": (
- generate_n_logs(n=4, tick_sec=100000),
- does_not_raise(),
+ generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000),
+ pytest.raises(SystemExit),
False,
),
# If a protocol error happens, _call_proxy will retry without affecting timeouts
mock_sleep, side_effect, expectation, has_finished, mock_proxy
):
with expectation:
- result = retriable_follow_job(mock_proxy(side_effect=side_effect), "")
+ proxy = mock_proxy(side_effect=side_effect)
+ result = retriable_follow_job(proxy, "")
assert has_finished == result