import contextlib
+import json
import pathlib
import sys
import time
+from collections import defaultdict
from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from io import StringIO
)
from lavacli.utils import flow_yaml as lava_yaml
+# Initialize structural logging with a defaultdict, it can be changed for more
+# sophisticated dict-like data abstractions.
+STRUCTURAL_LOG = defaultdict(list)
+
# Timeout in seconds to decide if the device from the dispatched LAVA job has
# hung or not due to the lack of new log output.
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60))
getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
)
-def find_exception_from_metadata(metadata, job_id):
+def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
+ """
+ Investigate infrastructure errors from the job metadata.
+ If it finds an error, raise it as MesaCIException.
+ """
if "result" not in metadata or metadata["result"] != "fail":
return
if "error_type" in metadata:
raise MesaCIException(
f"LAVA job {job_id} failed validation (possible download error). Retry."
)
- return metadata
-def find_lava_error(job) -> None:
- # Look for infrastructure errors and retry if we see them.
+def raise_lava_error(job) -> None:
+ # Look for infrastructure errors, raise them, and retry if we see them.
results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
results = lava_yaml.load(results_yaml)
for res in results:
metadata = res["metadata"]
- find_exception_from_metadata(metadata, job.job_id)
+ raise_exception_from_metadata(metadata, job.job_id)
# If we reach this far, it means that the job ended without hwci script
# result and no LAVA infrastructure problem was found
job.status = "fail"
-def show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"):
+def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"):
with GitlabSection(
"job_data",
"LAVA job info",
# If this does not happen, it probably means a LAVA infrastructure error
# happened.
if job.status not in ["pass", "fail"]:
- find_lava_error(job)
+ raise_lava_error(job)
+
def structural_log_phases(job, log_follower):
phases: dict[str, Any] = {
}
job.log["dut_job_phases"] = phases
+
def print_job_final_status(job):
if job.status == "running":
job.status = "hung"
)
job.refresh_log()
- job.log["status"] = job.status
- show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
+ show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
-def execute_job_with_retries(proxy, job_definition, retry_count) -> Optional[LAVAJob]:
+def execute_job_with_retries(
+ proxy, job_definition, retry_count, jobs_log
+) -> Optional[LAVAJob]:
for attempt_no in range(1, retry_count + 2):
- job = LAVAJob(proxy, job_definition)
+ # Need to get the logger value from its object to enable autosave
+ # features
+ jobs_log.append({})
+ job_log = jobs_log[-1]
+ job = LAVAJob(proxy, job_definition, job_log)
+ STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
try:
submit_job(job)
)
finally:
+ job_log["finished_time"] = datetime.now().isoformat()
print_job_final_status(job)
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
if finished_job := execute_job_with_retries(
- proxy, job_definition, number_of_retries
+ proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
):
return finished_job
validate_only: bool = False # Whether to only validate the job, not execute it
visibility_group: str = None # Only affects LAVA farm maintainers
job_rootfs_overlay_url: str = None
+ structured_log_file: pathlib.Path = None # Log file path with structured LAVA log
def __post_init__(self) -> None:
super().__post_init__()
# Remove mesa job names with spaces, which breaks the lava-test-case command
self.mesa_job_name = self.mesa_job_name.split(" ")[0]
+ if self.structured_log_file:
+ self.setup_structured_logger()
+
def dump(self, job_definition):
if self.dump_yaml:
with GitlabSection(
if self.validate_only:
return
- finished_job = retriable_follow_job(proxy, job_definition)
+ try:
+ finished_job = retriable_follow_job(proxy, job_definition)
+ except Exception as exception:
+ STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
+ raise exception
exit_code = 0 if finished_job.status == "pass" else 1
+ STRUCTURAL_LOG["job_combined_status"] = job.status
sys.exit(exit_code)
+ def setup_structured_logger(self):
+ try:
+ global STRUCTURAL_LOG
+ STRUCTURAL_LOG = StructuredLogger(
+ self.structured_log_file, truncate=True
+ ).data
+ except NameError as e:
+ print(
+ f"Could not import StructuredLogger library: {e}. "
+ "Falling back to DummyLogger"
+ )
+
+ STRUCTURAL_LOG["fixed_tags"] = self.lava_tags
+ STRUCTURAL_LOG["dut_job_type"] = self.device_type
+ STRUCTURAL_LOG["job_combined_fail_reason"] = None
+ STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
+
if __name__ == "__main__":
# given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
from contextlib import nullcontext as does_not_raise
from datetime import datetime
from itertools import chain, repeat
+from pathlib import Path
import pytest
from lava.exceptions import MesaCIException, MesaCIRetryError
DEVICE_HANGING_TIMEOUT_SEC,
NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
LAVAJob,
+ LAVAJobSubmitter,
bootstrap_log_follower,
follow_job_execution,
retriable_follow_job,
@pytest.mark.slow(
reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml"
)
-def test_full_yaml_log(mock_proxy, frozen_time):
+def test_full_yaml_log(mock_proxy, frozen_time, tmp_path):
import random
from lavacli.utils import flow_yaml as lava_yaml
def reset_logs(*args):
proxy.scheduler.jobs.logs.side_effect = load_lines()
+ tmp_file = Path(tmp_path) / "log.json"
+ LAVAJobSubmitter(structured_log_file=tmp_file)
proxy.scheduler.jobs.submit = reset_logs
with pytest.raises(MesaCIRetryError):
time_travel_to_test_time()
retriable_follow_job(proxy, "")
+ print(tmp_file.read_text())