From c03f7233ca9eea24505d085c774c279af29880e4 Mon Sep 17 00:00:00 2001 From: Guilherme Gallo Date: Tue, 4 Apr 2023 07:47:09 -0300 Subject: [PATCH] ci/lava: Extract LAVA proxy and LAVAJob abstractions Let's make lava_job_submitter.py cleaner with only parsing and retry mechanism capabilities. Moved out from the submitter script: 1. proxy functions - moved to lava.utils.lava_proxy.py 2. LAVAJob class definition - moved to lava.utils.lava_job.py - added structural logging capabilities into LAVAJob - Implemented properties for job_id, is_finished, and status, with corresponding setter methods that update the log dictionary. - Added new methods show, get_lava_time, and refresh_log for improved log handling and data retrieval. Signed-off-by: Guilherme Gallo Part-of: --- .gitlab-ci/lava/lava_job_submitter.py | 269 ++++++++++------------------ .gitlab-ci/lava/utils/__init__.py | 2 + .gitlab-ci/lava/utils/lava_job.py | 175 ++++++++++++++++++ .gitlab-ci/lava/utils/lava_proxy.py | 44 +++++ .gitlab-ci/lava/utils/log_follower.py | 4 +- .gitlab-ci/tests/test_lava_job_submitter.py | 6 +- 6 files changed, 327 insertions(+), 173 deletions(-) create mode 100644 .gitlab-ci/lava/utils/lava_job.py create mode 100644 .gitlab-ci/lava/utils/lava_proxy.py diff --git a/.gitlab-ci/lava/lava_job_submitter.py b/.gitlab-ci/lava/lava_job_submitter.py index a8b27d4..5aa9ec9 100755 --- a/.gitlab-ci/lava/lava_job_submitter.py +++ b/.gitlab-ci/lava/lava_job_submitter.py @@ -13,21 +13,15 @@ import argparse import contextlib import pathlib -import re import sys import time -import traceback -import urllib.parse -import xmlrpc.client from datetime import datetime, timedelta from io import StringIO from os import getenv -from typing import Optional +from typing import Any, Optional -import lavacli from lava.exceptions import ( MesaCIException, - MesaCIKnownIssueException, MesaCIParseException, MesaCIRetryError, MesaCITimeoutError, @@ -36,12 +30,15 @@ from lava.utils import CONSOLE_LOG from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS from lava.utils import ( GitlabSection, + LAVAJob, LogFollower, LogSectionType, + call_proxy, fatal_err, generate_lava_yaml_payload, hide_sensitive_data, print_log, + setup_lava_proxy, ) from lavacli.utils import flow_yaml as lava_yaml @@ -51,139 +48,24 @@ DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60 # How many seconds the script should wait before try a new polling iteration to # check if the dispatched LAVA job is running or waiting in the job queue. -WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10)) +WAIT_FOR_DEVICE_POLLING_TIME_SEC = int( + getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1) +) + +# How many seconds the script will wait to let LAVA finalize the job and give +# the final details. +WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5)) +WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int( + getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 3) +) # How many seconds to wait between log output LAVA RPC calls. LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5)) # How many retries should be made when a timeout happen. -NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)) - -def setup_lava_proxy(): - config = lavacli.load_config("default") - uri, usr, tok = (config.get(key) for key in ("uri", "username", "token")) - uri_obj = urllib.parse.urlparse(uri) - uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path) - transport = lavacli.RequestsTransport( - uri_obj.scheme, - config.get("proxy"), - config.get("timeout", 120.0), - config.get("verify_ssl_cert", True), - ) - proxy = xmlrpc.client.ServerProxy( - uri_str, allow_none=True, transport=transport) - - print_log("Proxy for {} created.".format(config['uri'])) - - return proxy - - -def _call_proxy(fn, *args): - retries = 60 - for n in range(1, retries + 1): - try: - return fn(*args) - except xmlrpc.client.ProtocolError as err: - if n == retries: - traceback.print_exc() - fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg)) - else: - time.sleep(15) - except xmlrpc.client.Fault as err: - traceback.print_exc() - fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode)) - - -class LAVAJob: - COLOR_STATUS_MAP = { - "pass": CONSOLE_LOG["FG_GREEN"], - "hung": CONSOLE_LOG["FG_YELLOW"], - "fail": CONSOLE_LOG["FG_RED"], - "canceled": CONSOLE_LOG["FG_MAGENTA"], - } - - def __init__(self, proxy, definition): - self.job_id = None - self.proxy = proxy - self.definition = definition - self.last_log_line = 0 - self.last_log_time = None - self.is_finished = False - self.status = "created" - - def heartbeat(self): - self.last_log_time = datetime.now() - self.status = "running" - - def validate(self) -> Optional[dict]: - """Returns a dict with errors, if the validation fails. - - Returns: - Optional[dict]: a dict with the validation errors, if any - """ - return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True) - - def submit(self): - try: - self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition) - except MesaCIException: - return False - return True - - def cancel(self): - if self.job_id: - self.proxy.scheduler.jobs.cancel(self.job_id) - - def is_started(self) -> bool: - waiting_states = ["Submitted", "Scheduling", "Scheduled"] - job_state: dict[str, str] = _call_proxy( - self.proxy.scheduler.job_state, self.job_id - ) - return job_state["job_state"] not in waiting_states - - def _load_log_from_data(self, data) -> list[str]: - lines = [] - if isinstance(data, xmlrpc.client.Binary): - # We are dealing with xmlrpc.client.Binary - # Let's extract the data - data = data.data - # When there is no new log data, the YAML is empty - if loaded_lines := lava_yaml.load(data): - lines = loaded_lines - self.last_log_line += len(lines) - return lines - - def get_logs(self) -> list[str]: - try: - (finished, data) = _call_proxy( - self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line - ) - self.is_finished = finished - return self._load_log_from_data(data) - - except Exception as mesa_ci_err: - raise MesaCIParseException( - f"Could not get LAVA job logs. Reason: {mesa_ci_err}" - ) from mesa_ci_err - - def parse_job_result_from_log( - self, lava_lines: list[dict[str, str]] - ) -> list[dict[str, str]]: - """Use the console log to catch if the job has completed successfully or - not. Returns the list of log lines until the result line.""" - - last_line = None # Print all lines. lines[:None] == lines[:] - - for idx, line in enumerate(lava_lines): - if result := re.search(r"hwci: mesa: (pass|fail)", line): - self.is_finished = True - self.status = result.group(1) - - last_line = idx + 1 - # We reached the log end here. hwci script has finished. - break - return lava_lines[:last_line] - +NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int( + getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2) +) def find_exception_from_metadata(metadata, job_id): if "result" not in metadata or metadata["result"] != "fail": @@ -212,7 +94,7 @@ def find_exception_from_metadata(metadata, job_id): def find_lava_error(job) -> None: # Look for infrastructure errors and retry if we see them. - results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id) + results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id) results = lava_yaml.load(results_yaml) for res in results: metadata = res["metadata"] @@ -231,12 +113,40 @@ def show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}") start_collapsed=True, colour=colour, ): - show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id) - for field, value in show.items(): + wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES + while not job.is_post_processed() and wait_post_processing_retries > 0: + # Wait a little until LAVA finishes processing metadata + time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC) + wait_post_processing_retries -= 1 + + if not job.is_post_processed(): + waited_for_sec: int = ( + WAIT_FOR_LAVA_POST_PROCESSING_RETRIES * WAIT_FOR_DEVICE_POLLING_TIME_SEC + ) + print_log( + f"Waited for {waited_for_sec} seconds" + "for LAVA to post-process the job, it haven't finished yet. " + "Dumping it's info anyway" + ) + + details: dict[str, str] = job.show() + for field, value in details.items(): print(f"{field:<15}: {value}") + job.refresh_log() def fetch_logs(job, max_idle_time, log_follower) -> None: + is_job_hanging(job, max_idle_time) + + time.sleep(LOG_POLLING_TIME_SEC) + new_log_lines = fetch_new_log_lines(job) + parsed_lines = parse_log_lines(job, log_follower, new_log_lines) + + for line in parsed_lines: + print_log(line) + + +def is_job_hanging(job, max_idle_time): # Poll to check for new logs, assuming that a prolonged period of # silence means that the device has died and we should try it again if datetime.now() - job.last_log_time > max_idle_time: @@ -251,17 +161,8 @@ def fetch_logs(job, max_idle_time, log_follower) -> None: timeout_duration=max_idle_time, ) - time.sleep(LOG_POLLING_TIME_SEC) - - # The XMLRPC binary packet may be corrupted, causing a YAML scanner error. - # Retry the log fetching several times before exposing the error. - for _ in range(5): - with contextlib.suppress(MesaCIParseException): - new_log_lines = job.get_logs() - break - else: - raise MesaCIParseException +def parse_log_lines(job, log_follower, new_log_lines): if log_follower.feed(new_log_lines): # If we had non-empty log data, we can assure that the device is alive. job.heartbeat() @@ -275,12 +176,22 @@ def fetch_logs(job, max_idle_time, log_follower) -> None: LogSectionType.LAVA_POST_PROCESSING, ): parsed_lines = job.parse_job_result_from_log(parsed_lines) + return parsed_lines - for line in parsed_lines: - print_log(line) + +def fetch_new_log_lines(job): + # The XMLRPC binary packet may be corrupted, causing a YAML scanner error. + # Retry the log fetching several times before exposing the error. + for _ in range(5): + with contextlib.suppress(MesaCIParseException): + new_log_lines = job.get_logs() + break + else: + raise MesaCIParseException + return new_log_lines -def follow_job_execution(job): +def submit_job(job): try: job.submit() except Exception as mesa_ci_err: @@ -288,11 +199,16 @@ def follow_job_execution(job): f"Could not submit LAVA job. Reason: {mesa_ci_err}" ) from mesa_ci_err + +def wait_for_job_get_started(job): print_log(f"Waiting for job {job.job_id} to start.") while not job.is_started(): time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) + job.refresh_log() print_log(f"Job {job.job_id} started.") + +def bootstrap_log_follower() -> LogFollower: gl = GitlabSection( id="lava_boot", header="LAVA boot", @@ -300,14 +216,16 @@ def follow_job_execution(job): start_collapsed=True, ) print(gl.start()) - max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) - with LogFollower(current_section=gl) as lf: + return LogFollower(current_section=gl) + +def follow_job_execution(job, log_follower): + with log_follower: max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) # Start to check job's health job.heartbeat() while not job.is_finished: - fetch_logs(job, max_idle_time, lf) + fetch_logs(job, max_idle_time, log_follower) # Mesa Developers expect to have a simple pass/fail job result. # If this does not happen, it probably means a LAVA infrastructure error @@ -327,41 +245,52 @@ def print_job_final_status(job): f"{CONSOLE_LOG['RESET']}" ) + job.refresh_log() + job.log["status"] = job.status show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}") -def retriable_follow_job(proxy, job_definition) -> LAVAJob: - retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION +def execute_job_with_retries(proxy, job_definition, retry_count) -> Optional[LAVAJob]: for attempt_no in range(1, retry_count + 2): + # Need to get the logger value from its object to enable autosave + # features, if AutoSaveDict is enabled from StructuredLogging module job = LAVAJob(proxy, job_definition) + try: - follow_job_execution(job) + submit_job(job) + wait_for_job_get_started(job) + log_follower: LogFollower = bootstrap_log_follower() + follow_job_execution(job, log_follower) return job - except MesaCIKnownIssueException as found_issue: - print_log(found_issue) - job.status = "canceled" - except MesaCIException as mesa_exception: - print_log(mesa_exception) - job.cancel() - except KeyboardInterrupt as e: - print_log("LAVA job submitter was interrupted. Cancelling the job.") - job.cancel() - raise e - finally: + + except (MesaCIException, KeyboardInterrupt) as exception: + job.handle_exception(exception) print_log( f"{CONSOLE_LOG['BOLD']}" f"Finished executing LAVA job in the attempt #{attempt_no}" f"{CONSOLE_LOG['RESET']}" ) + + finally: print_job_final_status(job) + +def retriable_follow_job(proxy, job_definition) -> LAVAJob: + number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + + if finished_job := execute_job_with_retries( + proxy, job_definition, number_of_retries + ): + return finished_job + + # Job failed in all attempts raise MesaCIRetryError( f"{CONSOLE_LOG['BOLD']}" f"{CONSOLE_LOG['FG_RED']}" "Job failed after it exceeded the number of " - f"{retry_count} retries." + f"{number_of_retries} retries." f"{CONSOLE_LOG['RESET']}", - retry_count=retry_count, + retry_count=number_of_retries, ) diff --git a/.gitlab-ci/lava/utils/__init__.py b/.gitlab-ci/lava/utils/__init__.py index ccb1b78..a02767c 100644 --- a/.gitlab-ci/lava/utils/__init__.py +++ b/.gitlab-ci/lava/utils/__init__.py @@ -1,6 +1,8 @@ from .console_format import CONSOLE_LOG from .gitlab_section import GitlabSection +from .lava_job import LAVAJob from .lava_job_definition import generate_lava_yaml_payload +from .lava_proxy import call_proxy, setup_lava_proxy from .log_follower import ( LogFollower, fatal_err, diff --git a/.gitlab-ci/lava/utils/lava_job.py b/.gitlab-ci/lava/utils/lava_job.py new file mode 100644 index 0000000..04d5300 --- /dev/null +++ b/.gitlab-ci/lava/utils/lava_job.py @@ -0,0 +1,175 @@ +import re +import xmlrpc +from collections import defaultdict +from datetime import datetime +from typing import Any, Optional + +from lava.exceptions import ( + MesaCIException, + MesaCIKnownIssueException, + MesaCIParseException, + MesaCITimeoutError, +) +from lava.utils import CONSOLE_LOG +from lava.utils.log_follower import print_log +from lavacli.utils import flow_yaml as lava_yaml + +from .lava_proxy import call_proxy + + +class LAVAJob: + COLOR_STATUS_MAP: dict[str, str] = { + "pass": CONSOLE_LOG["FG_GREEN"], + "hung": CONSOLE_LOG["FG_YELLOW"], + "fail": CONSOLE_LOG["FG_RED"], + "canceled": CONSOLE_LOG["FG_MAGENTA"], + } + + def __init__(self, proxy, definition, log=defaultdict(str)) -> None: + self._job_id = None + self.proxy = proxy + self.definition = definition + self.last_log_line = 0 + self.last_log_time = None + self._is_finished = False + self.log: dict[str, Any] = log + self.status = "not_submitted" + + def heartbeat(self) -> None: + self.last_log_time: datetime = datetime.now() + self.status = "running" + + @property + def status(self) -> str: + return self._status + + @status.setter + def status(self, new_status: str) -> None: + self._status = new_status + self.log["status"] = self._status + + @property + def job_id(self) -> int: + return self._job_id + + @job_id.setter + def job_id(self, new_id: int) -> None: + self._job_id = new_id + self.log["lava_job_id"] = self._job_id + + @property + def is_finished(self) -> bool: + return self._is_finished + + def validate(self) -> Optional[dict]: + """Returns a dict with errors, if the validation fails. + + Returns: + Optional[dict]: a dict with the validation errors, if any + """ + return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True) + + def show(self) -> dict[str, str]: + return call_proxy(self.proxy.scheduler.jobs.show, self._job_id) + + def get_lava_time(self, key, data) -> Optional[str]: + return data[key].value if data[key] else None + + def refresh_log(self) -> None: + details = self.show() + self.log["dut_start_time"] = self.get_lava_time("start_time", details) + self.log["dut_submit_time"] = self.get_lava_time("submit_time", details) + self.log["dut_end_time"] = self.get_lava_time("end_time", details) + self.log["dut_name"] = details.get("device") + self.log["dut_state"] = details.get("state") + + def submit(self) -> bool: + try: + self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition) + self.status = "submitted" + self.refresh_log() + except MesaCIException: + return False + return True + + def lava_state(self) -> str: + job_state: dict[str, str] = call_proxy( + self.proxy.scheduler.job_state, self._job_id + ) + return job_state["job_state"] + + def cancel(self): + if self._job_id: + self.proxy.scheduler.jobs.cancel(self._job_id) + # If we don't have yet set another job's status, let's update it + # with canceled one + if self.status == "running": + self.status = "canceled" + + def is_started(self) -> bool: + waiting_states = ("Submitted", "Scheduling", "Scheduled") + return self.lava_state() not in waiting_states + + def is_post_processed(self) -> bool: + return self.lava_state() != "Running" + + def _load_log_from_data(self, data) -> list[str]: + lines = [] + if isinstance(data, xmlrpc.client.Binary): + # We are dealing with xmlrpc.client.Binary + # Let's extract the data + data = data.data + # When there is no new log data, the YAML is empty + if loaded_lines := lava_yaml.load(data): + lines: list[str] = loaded_lines + self.last_log_line += len(lines) + return lines + + def get_logs(self) -> list[str]: + try: + (finished, data) = call_proxy( + self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line + ) + self._is_finished = finished + return self._load_log_from_data(data) + + except Exception as mesa_ci_err: + raise MesaCIParseException( + f"Could not get LAVA job logs. Reason: {mesa_ci_err}" + ) from mesa_ci_err + + def parse_job_result_from_log( + self, lava_lines: list[dict[str, str]] + ) -> list[dict[str, str]]: + """Use the console log to catch if the job has completed successfully or + not. Returns the list of log lines until the result line.""" + + last_line = None # Print all lines. lines[:None] == lines[:] + + for idx, line in enumerate(lava_lines): + if result := re.search(r"hwci: mesa: (pass|fail)", line): + self._is_finished = True + self.status = result[1] + + last_line = idx + 1 + # We reached the log end here. hwci script has finished. + break + return lava_lines[:last_line] + + def handle_exception(self, exception: Exception): + print_log(exception) + if isinstance(exception, MesaCIKnownIssueException): + self.status = "canceled" + elif isinstance(exception, MesaCITimeoutError): + self.status = "hung" + elif isinstance(exception, MesaCIException): + self.status = "failed" + elif isinstance(exception, KeyboardInterrupt): + self.status = "canceled_by_user" + print_log("LAVA job submitter was interrupted. Cancelling the job.") + raise exception + else: + self.status = "job_submitter_error" + + self.cancel() + self.log["dut_job_fail_reason"] = str(exception) diff --git a/.gitlab-ci/lava/utils/lava_proxy.py b/.gitlab-ci/lava/utils/lava_proxy.py new file mode 100644 index 0000000..581ec46 --- /dev/null +++ b/.gitlab-ci/lava/utils/lava_proxy.py @@ -0,0 +1,44 @@ +import time +import traceback +import urllib +import urllib.parse +import xmlrpc +import xmlrpc.client + +import lavacli + +from .log_follower import fatal_err, print_log + + +def setup_lava_proxy(): + config = lavacli.load_config("default") + uri, usr, tok = (config.get(key) for key in ("uri", "username", "token")) + uri_obj = urllib.parse.urlparse(uri) + uri_str = f"{uri_obj.scheme}://{usr}:{tok}@{uri_obj.netloc}{uri_obj.path}" + transport = lavacli.RequestsTransport( + uri_obj.scheme, + config.get("proxy"), + config.get("timeout", 120.0), + config.get("verify_ssl_cert", True), + ) + proxy = xmlrpc.client.ServerProxy(uri_str, allow_none=True, transport=transport) + + print_log(f'Proxy for {config["uri"]} created.') + + return proxy + + +def call_proxy(fn, *args): + retries = 60 + for n in range(1, retries + 1): + try: + return fn(*args) + except xmlrpc.client.ProtocolError as err: + if n == retries: + traceback.print_exc() + fatal_err(f"A protocol error occurred (Err {err.errcode} {err.errmsg})") + else: + time.sleep(15) + except xmlrpc.client.Fault as err: + traceback.print_exc() + fatal_err(f"FATAL: Fault: {err.faultString} (code: {err.faultCode})", err) diff --git a/.gitlab-ci/lava/utils/log_follower.py b/.gitlab-ci/lava/utils/log_follower.py index b2bfcf3..4b18efc 100644 --- a/.gitlab-ci/lava/utils/log_follower.py +++ b/.gitlab-ci/lava/utils/log_follower.py @@ -270,11 +270,13 @@ def print_log(msg: str) -> None: print(f"{CONSOLE_LOG['RESET']}{datetime.now()}: {msg}") -def fatal_err(msg): +def fatal_err(msg, exception=None): colored_msg = f"{CONSOLE_LOG['FG_RED']}" f"{msg}" f"{CONSOLE_LOG['RESET']}" print_log(colored_msg) + if exception: + raise exception sys.exit(1) diff --git a/.gitlab-ci/tests/test_lava_job_submitter.py b/.gitlab-ci/tests/test_lava_job_submitter.py index 88b253f..d481aac 100644 --- a/.gitlab-ci/tests/test_lava_job_submitter.py +++ b/.gitlab-ci/tests/test_lava_job_submitter.py @@ -16,6 +16,7 @@ from lava.lava_job_submitter import ( DEVICE_HANGING_TIMEOUT_SEC, NUMBER_OF_RETRIES_TIMEOUT_DETECTION, LAVAJob, + bootstrap_log_follower, follow_job_execution, retriable_follow_job, ) @@ -52,7 +53,8 @@ def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception with pytest.raises(MesaCIException): proxy = mock_proxy(side_effect=exception) job = LAVAJob(proxy, '') - follow_job_execution(job) + log_follower = bootstrap_log_follower() + follow_job_execution(job, log_follower) NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {}) @@ -179,7 +181,7 @@ PROXY_SCENARIOS = { "fail", {}, ), - "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(SystemExit, match="1"), False, {}), + "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, {}), } -- 2.7.4