start_collapsed=True,
)
print(gl.start())
- return LogFollower(current_section=gl)
+ return LogFollower(starting_section=gl)
def follow_job_execution(job, log_follower):
job.heartbeat()
while not job.is_finished:
fetch_logs(job, max_idle_time, log_follower)
+ structural_log_phases(job, log_follower)
# Mesa Developers expect to have a simple pass/fail job result.
# If this does not happen, it probably means a LAVA infrastructure error
if job.status not in ["pass", "fail"]:
find_lava_error(job)
+def structural_log_phases(job, log_follower):
+ phases: dict[str, Any] = {
+ s.header.split(" - ")[0]: {
+ k: str(getattr(s, k)) for k in ("start_time", "end_time")
+ }
+ for s in log_follower.section_history
+ }
+ job.log["dut_job_phases"] = phases
def print_job_final_status(job):
if job.status == "running":
from lava.utils.log_section import LogSectionType
+# TODO: Add section final status to assist with monitoring
@dataclass
class GitlabSection:
id: str
def has_finished(self) -> bool:
return self.__end_time is not None
+ @property
+ def start_time(self) -> datetime:
+ return self.__start_time
+
+ @property
+ def end_time(self) -> Optional[datetime]:
+ return self.__end_time
+
def get_timestamp(self, time: datetime) -> str:
unix_ts = datetime.timestamp(time)
return str(int(unix_ts))
return f"{before_header}{header_wrapper}"
+ def __str__(self) -> str:
+ status = "NS" if not self.has_started else "F" if self.has_finished else "IP"
+ delta = self.delta_time()
+ elapsed_time = "N/A" if delta is None else str(delta)
+ return (
+ f"GitlabSection({self.id}, {self.header}, {self.type}, "
+ f"SC={self.start_collapsed}, S={status}, ST={self.start_time}, "
+ f"ET={self.end_time}, ET={elapsed_time})"
+ )
+
def __enter__(self):
print(self.start())
return self
@dataclass
class LogFollower:
- current_section: Optional[GitlabSection] = None
+ starting_section: Optional[GitlabSection] = None
+ _current_section: Optional[GitlabSection] = None
+ section_history: list[GitlabSection] = field(default_factory=list, init=False)
timeout_durations: dict[LogSectionType, timedelta] = field(
default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS,
)
_merge_next_line: str = field(default_factory=str, init=False)
def __post_init__(self):
- section_is_created = bool(self.current_section)
+ # Make it trigger current_section setter to populate section history
+ self.current_section = self.starting_section
+ section_is_created = bool(self._current_section)
section_has_started = bool(
- self.current_section and self.current_section.has_started
+ self._current_section and self._current_section.has_started
)
self.log_hints = LAVALogHints(self)
assert (
next(self.gl_section_fix_gen)
@property
+ def current_section(self):
+ return self._current_section
+
+ @current_section.setter
+ def current_section(self, new_section: GitlabSection) -> None:
+ if old_section := self._current_section:
+ self.section_history.append(old_section)
+ self._current_section = new_section
+
+ @property
def phase(self) -> LogSectionType:
return (
- self.current_section.type
- if self.current_section
+ self._current_section.type
+ if self._current_section
else LogSectionType.UNKNOWN
)
print(line)
def watchdog(self):
- if not self.current_section:
+ if not self._current_section:
return
timeout_duration = self.timeout_durations.get(
- self.current_section.type, self.fallback_timeout
+ self._current_section.type, self.fallback_timeout
)
- if self.current_section.delta_time() > timeout_duration:
+ if self._current_section.delta_time() > timeout_duration:
raise MesaCITimeoutError(
- f"Gitlab Section {self.current_section} has timed out",
+ f"Gitlab Section {self._current_section} has timed out",
timeout_duration=timeout_duration,
)
def clear_current_section(self):
- if self.current_section and not self.current_section.has_finished:
- self._buffer.append(self.current_section.end())
+ if self._current_section and not self._current_section.has_finished:
+ self._buffer.append(self._current_section.end())
self.current_section = None
def update_section(self, new_section: GitlabSection):