ci/lava: Extract LAVA proxy and LAVAJob abstractions
authorGuilherme Gallo <guilherme.gallo@collabora.com>
Tue, 4 Apr 2023 10:47:09 +0000 (07:47 -0300)
committerMarge Bot <emma+marge@anholt.net>
Wed, 19 Apr 2023 14:36:37 +0000 (14:36 +0000)
Let's make lava_job_submitter.py cleaner with only parsing and retry
mechanism capabilities.

Moved out from the submitter script:

1. proxy functions
  - moved to lava.utils.lava_proxy.py
2. LAVAJob class definition
  - moved to lava.utils.lava_job.py
  - added structural logging capabilities into LAVAJob
  - Implemented properties for job_id, is_finished, and status, with
    corresponding setter methods that update the log dictionary.
  - Added new methods show, get_lava_time, and refresh_log for improved
    log handling and data retrieval.

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/22500>

.gitlab-ci/lava/lava_job_submitter.py
.gitlab-ci/lava/utils/__init__.py
.gitlab-ci/lava/utils/lava_job.py [new file with mode: 0644]
.gitlab-ci/lava/utils/lava_proxy.py [new file with mode: 0644]
.gitlab-ci/lava/utils/log_follower.py
.gitlab-ci/tests/test_lava_job_submitter.py

index a8b27d4..5aa9ec9 100755 (executable)
 import argparse
 import contextlib
 import pathlib
-import re
 import sys
 import time
-import traceback
-import urllib.parse
-import xmlrpc.client
 from datetime import datetime, timedelta
 from io import StringIO
 from os import getenv
-from typing import Optional
+from typing import Any, Optional
 
-import lavacli
 from lava.exceptions import (
     MesaCIException,
-    MesaCIKnownIssueException,
     MesaCIParseException,
     MesaCIRetryError,
     MesaCITimeoutError,
@@ -36,12 +30,15 @@ from lava.utils import CONSOLE_LOG
 from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
 from lava.utils import (
     GitlabSection,
+    LAVAJob,
     LogFollower,
     LogSectionType,
+    call_proxy,
     fatal_err,
     generate_lava_yaml_payload,
     hide_sensitive_data,
     print_log,
+    setup_lava_proxy,
 )
 from lavacli.utils import flow_yaml as lava_yaml
 
@@ -51,139 +48,24 @@ DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC",  5*60
 
 # How many seconds the script should wait before try a new polling iteration to
 # check if the dispatched LAVA job is running or waiting in the job queue.
-WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10))
+WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
+    getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
+)
+
+# How many seconds the script will wait to let LAVA finalize the job and give
+# the final details.
+WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
+WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
+    getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 3)
+)
 
 # How many seconds to wait between log output LAVA RPC calls.
 LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
 
 # How many retries should be made when a timeout happen.
-NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2))
-
-def setup_lava_proxy():
-    config = lavacli.load_config("default")
-    uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
-    uri_obj = urllib.parse.urlparse(uri)
-    uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
-    transport = lavacli.RequestsTransport(
-        uri_obj.scheme,
-        config.get("proxy"),
-        config.get("timeout", 120.0),
-        config.get("verify_ssl_cert", True),
-    )
-    proxy = xmlrpc.client.ServerProxy(
-        uri_str, allow_none=True, transport=transport)
-
-    print_log("Proxy for {} created.".format(config['uri']))
-
-    return proxy
-
-
-def _call_proxy(fn, *args):
-    retries = 60
-    for n in range(1, retries + 1):
-        try:
-            return fn(*args)
-        except xmlrpc.client.ProtocolError as err:
-            if n == retries:
-                traceback.print_exc()
-                fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
-            else:
-                time.sleep(15)
-        except xmlrpc.client.Fault as err:
-            traceback.print_exc()
-            fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
-
-
-class LAVAJob:
-    COLOR_STATUS_MAP = {
-        "pass": CONSOLE_LOG["FG_GREEN"],
-        "hung": CONSOLE_LOG["FG_YELLOW"],
-        "fail": CONSOLE_LOG["FG_RED"],
-        "canceled": CONSOLE_LOG["FG_MAGENTA"],
-    }
-
-    def __init__(self, proxy, definition):
-        self.job_id = None
-        self.proxy = proxy
-        self.definition = definition
-        self.last_log_line = 0
-        self.last_log_time = None
-        self.is_finished = False
-        self.status = "created"
-
-    def heartbeat(self):
-        self.last_log_time = datetime.now()
-        self.status = "running"
-
-    def validate(self) -> Optional[dict]:
-        """Returns a dict with errors, if the validation fails.
-
-        Returns:
-            Optional[dict]: a dict with the validation errors, if any
-        """
-        return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
-
-    def submit(self):
-        try:
-            self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
-        except MesaCIException:
-            return False
-        return True
-
-    def cancel(self):
-        if self.job_id:
-            self.proxy.scheduler.jobs.cancel(self.job_id)
-
-    def is_started(self) -> bool:
-        waiting_states = ["Submitted", "Scheduling", "Scheduled"]
-        job_state: dict[str, str] = _call_proxy(
-            self.proxy.scheduler.job_state, self.job_id
-        )
-        return job_state["job_state"] not in waiting_states
-
-    def _load_log_from_data(self, data) -> list[str]:
-        lines = []
-        if isinstance(data, xmlrpc.client.Binary):
-            # We are dealing with xmlrpc.client.Binary
-            # Let's extract the data
-            data = data.data
-        # When there is no new log data, the YAML is empty
-        if loaded_lines := lava_yaml.load(data):
-            lines = loaded_lines
-            self.last_log_line += len(lines)
-        return lines
-
-    def get_logs(self) -> list[str]:
-        try:
-            (finished, data) = _call_proxy(
-                self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
-            )
-            self.is_finished = finished
-            return self._load_log_from_data(data)
-
-        except Exception as mesa_ci_err:
-            raise MesaCIParseException(
-                f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
-            ) from mesa_ci_err
-
-    def parse_job_result_from_log(
-        self, lava_lines: list[dict[str, str]]
-    ) -> list[dict[str, str]]:
-        """Use the console log to catch if the job has completed successfully or
-        not. Returns the list of log lines until the result line."""
-
-        last_line = None  # Print all lines. lines[:None] == lines[:]
-
-        for idx, line in enumerate(lava_lines):
-            if result := re.search(r"hwci: mesa: (pass|fail)", line):
-                self.is_finished = True
-                self.status = result.group(1)
-
-                last_line = idx + 1
-                # We reached the log end here. hwci script has finished.
-                break
-        return lava_lines[:last_line]
-
+NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
+    getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
+)
 
 def find_exception_from_metadata(metadata, job_id):
     if "result" not in metadata or metadata["result"] != "fail":
@@ -212,7 +94,7 @@ def find_exception_from_metadata(metadata, job_id):
 
 def find_lava_error(job) -> None:
     # Look for infrastructure errors and retry if we see them.
-    results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
+    results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
     results = lava_yaml.load(results_yaml)
     for res in results:
         metadata = res["metadata"]
@@ -231,12 +113,40 @@ def show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}")
         start_collapsed=True,
         colour=colour,
     ):
-        show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
-        for field, value in show.items():
+        wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
+        while not job.is_post_processed() and wait_post_processing_retries > 0:
+            # Wait a little until LAVA finishes processing metadata
+            time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
+            wait_post_processing_retries -= 1
+
+        if not job.is_post_processed():
+            waited_for_sec: int = (
+                WAIT_FOR_LAVA_POST_PROCESSING_RETRIES * WAIT_FOR_DEVICE_POLLING_TIME_SEC
+            )
+            print_log(
+                f"Waited for {waited_for_sec} seconds"
+                "for LAVA to post-process the job, it haven't finished yet. "
+                "Dumping it's info anyway"
+            )
+
+        details: dict[str, str] = job.show()
+        for field, value in details.items():
             print(f"{field:<15}: {value}")
+        job.refresh_log()
 
 
 def fetch_logs(job, max_idle_time, log_follower) -> None:
+    is_job_hanging(job, max_idle_time)
+
+    time.sleep(LOG_POLLING_TIME_SEC)
+    new_log_lines = fetch_new_log_lines(job)
+    parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
+
+    for line in parsed_lines:
+        print_log(line)
+
+
+def is_job_hanging(job, max_idle_time):
     # Poll to check for new logs, assuming that a prolonged period of
     # silence means that the device has died and we should try it again
     if datetime.now() - job.last_log_time > max_idle_time:
@@ -251,17 +161,8 @@ def fetch_logs(job, max_idle_time, log_follower) -> None:
             timeout_duration=max_idle_time,
         )
 
-    time.sleep(LOG_POLLING_TIME_SEC)
-
-    # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
-    # Retry the log fetching several times before exposing the error.
-    for _ in range(5):
-        with contextlib.suppress(MesaCIParseException):
-            new_log_lines = job.get_logs()
-            break
-    else:
-        raise MesaCIParseException
 
+def parse_log_lines(job, log_follower, new_log_lines):
     if log_follower.feed(new_log_lines):
         # If we had non-empty log data, we can assure that the device is alive.
         job.heartbeat()
@@ -275,12 +176,22 @@ def fetch_logs(job, max_idle_time, log_follower) -> None:
         LogSectionType.LAVA_POST_PROCESSING,
     ):
         parsed_lines = job.parse_job_result_from_log(parsed_lines)
+    return parsed_lines
 
-    for line in parsed_lines:
-        print_log(line)
+
+def fetch_new_log_lines(job):
+    # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
+    # Retry the log fetching several times before exposing the error.
+    for _ in range(5):
+        with contextlib.suppress(MesaCIParseException):
+            new_log_lines = job.get_logs()
+            break
+    else:
+        raise MesaCIParseException
+    return new_log_lines
 
 
-def follow_job_execution(job):
+def submit_job(job):
     try:
         job.submit()
     except Exception as mesa_ci_err:
@@ -288,11 +199,16 @@ def follow_job_execution(job):
             f"Could not submit LAVA job. Reason: {mesa_ci_err}"
         ) from mesa_ci_err
 
+
+def wait_for_job_get_started(job):
     print_log(f"Waiting for job {job.job_id} to start.")
     while not job.is_started():
         time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
+    job.refresh_log()
     print_log(f"Job {job.job_id} started.")
 
+
+def bootstrap_log_follower() -> LogFollower:
     gl = GitlabSection(
         id="lava_boot",
         header="LAVA boot",
@@ -300,14 +216,16 @@ def follow_job_execution(job):
         start_collapsed=True,
     )
     print(gl.start())
-    max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
-    with LogFollower(current_section=gl) as lf:
+    return LogFollower(current_section=gl)
 
+
+def follow_job_execution(job, log_follower):
+    with log_follower:
         max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
         # Start to check job's health
         job.heartbeat()
         while not job.is_finished:
-            fetch_logs(job, max_idle_time, lf)
+            fetch_logs(job, max_idle_time, log_follower)
 
     # Mesa Developers expect to have a simple pass/fail job result.
     # If this does not happen, it probably means a LAVA infrastructure error
@@ -327,41 +245,52 @@ def print_job_final_status(job):
         f"{CONSOLE_LOG['RESET']}"
     )
 
+    job.refresh_log()
+    job.log["status"] = job.status
     show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
 
-def retriable_follow_job(proxy, job_definition) -> LAVAJob:
-    retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
 
+def execute_job_with_retries(proxy, job_definition, retry_count) -> Optional[LAVAJob]:
     for attempt_no in range(1, retry_count + 2):
+        # Need to get the logger value from its object to enable autosave
+        # features, if AutoSaveDict is enabled from StructuredLogging module
         job = LAVAJob(proxy, job_definition)
+
         try:
-            follow_job_execution(job)
+            submit_job(job)
+            wait_for_job_get_started(job)
+            log_follower: LogFollower = bootstrap_log_follower()
+            follow_job_execution(job, log_follower)
             return job
-        except MesaCIKnownIssueException as found_issue:
-            print_log(found_issue)
-            job.status = "canceled"
-        except MesaCIException as mesa_exception:
-            print_log(mesa_exception)
-            job.cancel()
-        except KeyboardInterrupt as e:
-            print_log("LAVA job submitter was interrupted. Cancelling the job.")
-            job.cancel()
-            raise e
-        finally:
+
+        except (MesaCIException, KeyboardInterrupt) as exception:
+            job.handle_exception(exception)
             print_log(
                 f"{CONSOLE_LOG['BOLD']}"
                 f"Finished executing LAVA job in the attempt #{attempt_no}"
                 f"{CONSOLE_LOG['RESET']}"
             )
+
+        finally:
             print_job_final_status(job)
 
+
+def retriable_follow_job(proxy, job_definition) -> LAVAJob:
+    number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
+
+    if finished_job := execute_job_with_retries(
+        proxy, job_definition, number_of_retries
+    ):
+        return finished_job
+
+    # Job failed in all attempts
     raise MesaCIRetryError(
         f"{CONSOLE_LOG['BOLD']}"
         f"{CONSOLE_LOG['FG_RED']}"
         "Job failed after it exceeded the number of "
-        f"{retry_count} retries."
+        f"{number_of_retries} retries."
         f"{CONSOLE_LOG['RESET']}",
-        retry_count=retry_count,
+        retry_count=number_of_retries,
     )
 
 
index ccb1b78..a02767c 100644 (file)
@@ -1,6 +1,8 @@
 from .console_format import CONSOLE_LOG
 from .gitlab_section import GitlabSection
+from .lava_job import LAVAJob
 from .lava_job_definition import generate_lava_yaml_payload
+from .lava_proxy import call_proxy, setup_lava_proxy
 from .log_follower import (
     LogFollower,
     fatal_err,
diff --git a/.gitlab-ci/lava/utils/lava_job.py b/.gitlab-ci/lava/utils/lava_job.py
new file mode 100644 (file)
index 0000000..04d5300
--- /dev/null
@@ -0,0 +1,175 @@
+import re
+import xmlrpc
+from collections import defaultdict
+from datetime import datetime
+from typing import Any, Optional
+
+from lava.exceptions import (
+    MesaCIException,
+    MesaCIKnownIssueException,
+    MesaCIParseException,
+    MesaCITimeoutError,
+)
+from lava.utils import CONSOLE_LOG
+from lava.utils.log_follower import print_log
+from lavacli.utils import flow_yaml as lava_yaml
+
+from .lava_proxy import call_proxy
+
+
+class LAVAJob:
+    COLOR_STATUS_MAP: dict[str, str] = {
+        "pass": CONSOLE_LOG["FG_GREEN"],
+        "hung": CONSOLE_LOG["FG_YELLOW"],
+        "fail": CONSOLE_LOG["FG_RED"],
+        "canceled": CONSOLE_LOG["FG_MAGENTA"],
+    }
+
+    def __init__(self, proxy, definition, log=defaultdict(str)) -> None:
+        self._job_id = None
+        self.proxy = proxy
+        self.definition = definition
+        self.last_log_line = 0
+        self.last_log_time = None
+        self._is_finished = False
+        self.log: dict[str, Any] = log
+        self.status = "not_submitted"
+
+    def heartbeat(self) -> None:
+        self.last_log_time: datetime = datetime.now()
+        self.status = "running"
+
+    @property
+    def status(self) -> str:
+        return self._status
+
+    @status.setter
+    def status(self, new_status: str) -> None:
+        self._status = new_status
+        self.log["status"] = self._status
+
+    @property
+    def job_id(self) -> int:
+        return self._job_id
+
+    @job_id.setter
+    def job_id(self, new_id: int) -> None:
+        self._job_id = new_id
+        self.log["lava_job_id"] = self._job_id
+
+    @property
+    def is_finished(self) -> bool:
+        return self._is_finished
+
+    def validate(self) -> Optional[dict]:
+        """Returns a dict with errors, if the validation fails.
+
+        Returns:
+            Optional[dict]: a dict with the validation errors, if any
+        """
+        return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
+
+    def show(self) -> dict[str, str]:
+        return call_proxy(self.proxy.scheduler.jobs.show, self._job_id)
+
+    def get_lava_time(self, key, data) -> Optional[str]:
+        return data[key].value if data[key] else None
+
+    def refresh_log(self) -> None:
+        details = self.show()
+        self.log["dut_start_time"] = self.get_lava_time("start_time", details)
+        self.log["dut_submit_time"] = self.get_lava_time("submit_time", details)
+        self.log["dut_end_time"] = self.get_lava_time("end_time", details)
+        self.log["dut_name"] = details.get("device")
+        self.log["dut_state"] = details.get("state")
+
+    def submit(self) -> bool:
+        try:
+            self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
+            self.status = "submitted"
+            self.refresh_log()
+        except MesaCIException:
+            return False
+        return True
+
+    def lava_state(self) -> str:
+        job_state: dict[str, str] = call_proxy(
+            self.proxy.scheduler.job_state, self._job_id
+        )
+        return job_state["job_state"]
+
+    def cancel(self):
+        if self._job_id:
+            self.proxy.scheduler.jobs.cancel(self._job_id)
+            # If we don't have yet set another job's status, let's update it
+            # with canceled one
+            if self.status == "running":
+                self.status = "canceled"
+
+    def is_started(self) -> bool:
+        waiting_states = ("Submitted", "Scheduling", "Scheduled")
+        return self.lava_state() not in waiting_states
+
+    def is_post_processed(self) -> bool:
+        return self.lava_state() != "Running"
+
+    def _load_log_from_data(self, data) -> list[str]:
+        lines = []
+        if isinstance(data, xmlrpc.client.Binary):
+            # We are dealing with xmlrpc.client.Binary
+            # Let's extract the data
+            data = data.data
+        # When there is no new log data, the YAML is empty
+        if loaded_lines := lava_yaml.load(data):
+            lines: list[str] = loaded_lines
+            self.last_log_line += len(lines)
+        return lines
+
+    def get_logs(self) -> list[str]:
+        try:
+            (finished, data) = call_proxy(
+                self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line
+            )
+            self._is_finished = finished
+            return self._load_log_from_data(data)
+
+        except Exception as mesa_ci_err:
+            raise MesaCIParseException(
+                f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
+            ) from mesa_ci_err
+
+    def parse_job_result_from_log(
+        self, lava_lines: list[dict[str, str]]
+    ) -> list[dict[str, str]]:
+        """Use the console log to catch if the job has completed successfully or
+        not. Returns the list of log lines until the result line."""
+
+        last_line = None  # Print all lines. lines[:None] == lines[:]
+
+        for idx, line in enumerate(lava_lines):
+            if result := re.search(r"hwci: mesa: (pass|fail)", line):
+                self._is_finished = True
+                self.status = result[1]
+
+                last_line = idx + 1
+                # We reached the log end here. hwci script has finished.
+                break
+        return lava_lines[:last_line]
+
+    def handle_exception(self, exception: Exception):
+        print_log(exception)
+        if isinstance(exception, MesaCIKnownIssueException):
+            self.status = "canceled"
+        elif isinstance(exception, MesaCITimeoutError):
+            self.status = "hung"
+        elif isinstance(exception, MesaCIException):
+            self.status = "failed"
+        elif isinstance(exception, KeyboardInterrupt):
+            self.status = "canceled_by_user"
+            print_log("LAVA job submitter was interrupted. Cancelling the job.")
+            raise exception
+        else:
+            self.status = "job_submitter_error"
+
+        self.cancel()
+        self.log["dut_job_fail_reason"] = str(exception)
diff --git a/.gitlab-ci/lava/utils/lava_proxy.py b/.gitlab-ci/lava/utils/lava_proxy.py
new file mode 100644 (file)
index 0000000..581ec46
--- /dev/null
@@ -0,0 +1,44 @@
+import time
+import traceback
+import urllib
+import urllib.parse
+import xmlrpc
+import xmlrpc.client
+
+import lavacli
+
+from .log_follower import fatal_err, print_log
+
+
+def setup_lava_proxy():
+    config = lavacli.load_config("default")
+    uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
+    uri_obj = urllib.parse.urlparse(uri)
+    uri_str = f"{uri_obj.scheme}://{usr}:{tok}@{uri_obj.netloc}{uri_obj.path}"
+    transport = lavacli.RequestsTransport(
+        uri_obj.scheme,
+        config.get("proxy"),
+        config.get("timeout", 120.0),
+        config.get("verify_ssl_cert", True),
+    )
+    proxy = xmlrpc.client.ServerProxy(uri_str, allow_none=True, transport=transport)
+
+    print_log(f'Proxy for {config["uri"]} created.')
+
+    return proxy
+
+
+def call_proxy(fn, *args):
+    retries = 60
+    for n in range(1, retries + 1):
+        try:
+            return fn(*args)
+        except xmlrpc.client.ProtocolError as err:
+            if n == retries:
+                traceback.print_exc()
+                fatal_err(f"A protocol error occurred (Err {err.errcode} {err.errmsg})")
+            else:
+                time.sleep(15)
+        except xmlrpc.client.Fault as err:
+            traceback.print_exc()
+            fatal_err(f"FATAL: Fault: {err.faultString} (code: {err.faultCode})", err)
index b2bfcf3..4b18efc 100644 (file)
@@ -270,11 +270,13 @@ def print_log(msg: str) -> None:
     print(f"{CONSOLE_LOG['RESET']}{datetime.now()}: {msg}")
 
 
-def fatal_err(msg):
+def fatal_err(msg, exception=None):
     colored_msg = f"{CONSOLE_LOG['FG_RED']}"
     f"{msg}"
     f"{CONSOLE_LOG['RESET']}"
     print_log(colored_msg)
+    if exception:
+        raise exception
     sys.exit(1)
 
 
index 88b253f..d481aac 100644 (file)
@@ -16,6 +16,7 @@ from lava.lava_job_submitter import (
     DEVICE_HANGING_TIMEOUT_SEC,
     NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
     LAVAJob,
+    bootstrap_log_follower,
     follow_job_execution,
     retriable_follow_job,
 )
@@ -52,7 +53,8 @@ def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception
     with pytest.raises(MesaCIException):
         proxy = mock_proxy(side_effect=exception)
         job = LAVAJob(proxy, '')
-        follow_job_execution(job)
+        log_follower = bootstrap_log_follower()
+        follow_job_execution(job, log_follower)
 
 
 NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {})
@@ -179,7 +181,7 @@ PROXY_SCENARIOS = {
         "fail",
         {},
     ),
-    "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(SystemExit, match="1"), False, {}),
+    "XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, {}),
 }