3 # Copyright (C) 2020, 2021 Collabora Limited
4 # Author: Gustavo Padovan <gustavo.padovan@collabora.com>
6 # Permission is hereby granted, free of charge, to any person obtaining a
7 # copy of this software and associated documentation files (the "Software"),
8 # to deal in the Software without restriction, including without limitation
9 # the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 # and/or sell copies of the Software, and to permit persons to whom the
11 # Software is furnished to do so, subject to the following conditions:
13 # The above copyright notice and this permission notice (including the next
14 # paragraph) shall be included in all copies or substantial portions of the
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 """Send a job to LAVA, track it and collect log back"""
37 from datetime import datetime, timedelta
39 from typing import Any, Optional
43 from lava.exceptions import (
49 from lavacli.utils import loader
51 # Timeout in seconds to decide if the device from the dispatched LAVA job has
52 # hung or not due to the lack of new log output.
53 DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60))
55 # How many seconds the script should wait before try a new polling iteration to
56 # check if the dispatched LAVA job is running or waiting in the job queue.
57 WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10))
59 # How many seconds to wait between log output LAVA RPC calls.
60 LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
62 # How many retries should be made when a timeout happen.
63 NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2))
65 # How many attempts should be made when a timeout happen during LAVA device boot.
66 NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 3))
70 print("{}: {}".format(datetime.now(), msg))
77 def hide_sensitive_data(yaml_data, hide_tag="HIDEME"):
78 return "".join(line for line in yaml_data.splitlines(True) if hide_tag not in line)
81 def generate_lava_yaml(args):
82 # General metadata and permissions, plus also inexplicably kernel arguments
84 'job_name': 'mesa: {}'.format(args.pipeline_info),
85 'device_type': args.device_type,
86 'visibility': { 'group': [ args.visibility_group ] },
89 'extra_nfsroot_args': ' init=/init rootwait usbcore.quirks=0bda:8153:k'
92 "job": {"minutes": args.job_timeout},
93 "action": {"minutes": 3},
95 "depthcharge-action": {
96 "minutes": 3 * NUMBER_OF_ATTEMPTS_LAVA_BOOT,
103 values['tags'] = args.lava_tags.split(',')
105 # URLs to our kernel rootfs to boot from, both generated by the base
108 'timeout': { 'minutes': 10 },
112 'url': '{}/{}'.format(args.kernel_url_prefix, args.kernel_image_name),
115 'url': '{}/lava-rootfs.tgz'.format(args.rootfs_url_prefix),
119 if args.kernel_image_type:
120 deploy['kernel']['type'] = args.kernel_image_type
123 'url': '{}/{}.dtb'.format(args.kernel_url_prefix, args.dtb)
126 # always boot over NFS
128 "failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
129 "method": args.boot_method,
131 "prompts": ["lava-shell:"],
134 # skeleton test definition: only declaring each job as a single 'test'
135 # since LAVA's test parsing is not useful to us
138 'timeout': { 'minutes': args.job_timeout },
143 'lava-signal': 'kmsg',
144 'path': 'inline/mesa.yaml',
148 'description': 'Mesa test plan',
150 'scope': [ 'functional' ],
151 'format': 'Lava-Test Test Definition 1.0',
160 # job execution script:
161 # - inline .gitlab-ci/common/init-stage1.sh
162 # - fetch and unpack per-pipeline build artifacts from build job
163 # - fetch and unpack per-job environment from lava-submit.sh
164 # - exec .gitlab-ci/common/init-stage2.sh
166 with open(args.first_stage_init, 'r') as init_sh:
167 run_steps += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ]
170 with open(args.jwt_file) as jwt_file:
173 f'echo -n "{jwt_file.read()}" > "{args.jwt_file}" # HIDEME',
175 f'echo "export CI_JOB_JWT_FILE={args.jwt_file}" >> /set-job-env-vars.sh',
179 "echo Could not find jwt file, disabling MINIO requests...",
180 "unset MINIO_RESULTS_UPLOAD",
184 'mkdir -p {}'.format(args.ci_project_dir),
185 'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.build_url, args.ci_project_dir),
186 'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url),
187 # Putting CI_JOB name as the testcase name, it may help LAVA farm
188 # maintainers with monitoring
189 f"lava-test-case 'mesa-ci_{args.mesa_job_name}' --shell /init-stage2.sh",
192 values['actions'] = [
193 { 'deploy': deploy },
198 return yaml.dump(values, width=10000000)
201 def setup_lava_proxy():
202 config = lavacli.load_config("default")
203 uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
204 uri_obj = urllib.parse.urlparse(uri)
205 uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
206 transport = lavacli.RequestsTransport(
209 config.get("timeout", 120.0),
210 config.get("verify_ssl_cert", True),
212 proxy = xmlrpc.client.ServerProxy(
213 uri_str, allow_none=True, transport=transport)
215 print_log("Proxy for {} created.".format(config['uri']))
220 def _call_proxy(fn, *args):
222 for n in range(1, retries + 1):
225 except xmlrpc.client.ProtocolError as err:
227 traceback.print_exc()
228 fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
231 except xmlrpc.client.Fault as err:
232 traceback.print_exc()
233 fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
237 def __init__(self, proxy, definition):
240 self.definition = definition
241 self.last_log_line = 0
242 self.last_log_time = None
243 self.is_finished = False
246 self.last_log_time = datetime.now()
248 def validate(self) -> Optional[dict]:
249 """Returns a dict with errors, if the validation fails.
252 Optional[dict]: a dict with the validation errors, if any
254 return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
258 self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
259 except MesaCIException:
265 self.proxy.scheduler.jobs.cancel(self.job_id)
267 def is_started(self) -> bool:
268 waiting_states = ["Submitted", "Scheduling", "Scheduled"]
269 job_state: dict[str, str] = _call_proxy(
270 self.proxy.scheduler.job_state, self.job_id
272 return job_state["job_state"] not in waiting_states
274 def _load_log_from_data(self, data) -> list[str]:
276 # When there is no new log data, the YAML is empty
277 if loaded_lines := yaml.load(str(data), Loader=loader(False)):
279 # If we had non-empty log data, we can assure that the device is alive.
281 self.last_log_line += len(lines)
284 def get_logs(self) -> list[str]:
286 (finished, data) = _call_proxy(
287 self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
289 self.is_finished = finished
290 return self._load_log_from_data(data)
292 except Exception as mesa_ci_err:
293 raise MesaCIParseException(
294 f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
298 def find_exception_from_metadata(metadata, job_id):
299 if "result" not in metadata or metadata["result"] != "fail":
301 if "error_type" in metadata:
302 error_type = metadata["error_type"]
303 if error_type == "Infrastructure":
304 raise MesaCIException(
305 f"LAVA job {job_id} failed with Infrastructure Error. Retry."
307 if error_type == "Job":
308 # This happens when LAVA assumes that the job cannot terminate or
309 # with mal-formed job definitions. As we are always validating the
310 # jobs, only the former is probable to happen. E.g.: When some LAVA
311 # action timed out more times than expected in job definition.
312 raise MesaCIException(
313 f"LAVA job {job_id} failed with JobError "
314 "(possible LAVA timeout misconfiguration/bug). Retry."
316 if "case" in metadata and metadata["case"] == "validate":
317 raise MesaCIException(
318 f"LAVA job {job_id} failed validation (possible download error). Retry."
323 def get_job_results(proxy, job_id, test_suite):
324 # Look for infrastructure errors and retry if we see them.
325 results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
326 results = yaml.load(results_yaml, Loader=loader(False))
328 metadata = res["metadata"]
329 find_exception_from_metadata(metadata, job_id)
331 results_yaml = _call_proxy(
332 proxy.results.get_testsuite_results_yaml, job_id, test_suite
334 results: list = yaml.load(results_yaml, Loader=loader(False))
336 raise MesaCIException(
337 f"LAVA: no result for test_suite '{test_suite}'"
340 for metadata in results:
341 test_case = metadata["name"]
342 result = metadata["metadata"]["result"]
344 f"LAVA: result for test_suite '{test_suite}', "
345 f"test_case '{test_case}': {result}"
353 def show_job_data(job):
354 show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
355 for field, value in show.items():
356 print("{}\t: {}".format(field, value))
359 def parse_lava_lines(new_lines) -> list[str]:
360 parsed_lines: list[str] = []
361 for line in new_lines:
362 if line["lvl"] in ["results", "feedback"]:
364 elif line["lvl"] in ["warning", "error"]:
365 prefix = "\x1b[1;38;5;197m"
367 elif line["lvl"] == "input":
373 line = f'{prefix}{line["msg"]}{suffix}'
374 parsed_lines.append(line)
379 def fetch_logs(job, max_idle_time):
380 # Poll to check for new logs, assuming that a prolonged period of
381 # silence means that the device has died and we should try it again
382 if datetime.now() - job.last_log_time > max_idle_time:
383 max_idle_time_min = max_idle_time.total_seconds() / 60
385 f"No log output for {max_idle_time_min} minutes; "
386 "assuming device has died, retrying"
389 raise MesaCITimeoutError(
390 f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
392 timeout_duration=max_idle_time,
395 time.sleep(LOG_POLLING_TIME_SEC)
397 # The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
398 # Retry the log fetching several times before exposing the error.
400 with contextlib.suppress(MesaCIParseException):
401 new_lines = job.get_logs()
404 raise MesaCIParseException
406 parsed_lines = parse_lava_lines(new_lines)
408 for line in parsed_lines:
412 def follow_job_execution(job):
415 except Exception as mesa_ci_err:
416 raise MesaCIException(
417 f"Could not submit LAVA job. Reason: {mesa_ci_err}"
420 print_log(f"Waiting for job {job.job_id} to start.")
421 while not job.is_started():
422 time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
423 print_log(f"Job {job.job_id} started.")
425 max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
426 # Start to check job's health
428 while not job.is_finished:
429 fetch_logs(job, max_idle_time)
432 return get_job_results(job.proxy, job.job_id, "0_mesa")
435 def retriable_follow_job(proxy, job_definition):
436 retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
438 for attempt_no in range(1, retry_count + 2):
439 job = LAVAJob(proxy, job_definition)
441 return follow_job_execution(job)
442 except MesaCIException as mesa_exception:
443 print_log(mesa_exception)
445 except KeyboardInterrupt as e:
446 print_log("LAVA job submitter was interrupted. Cancelling the job.")
450 print_log(f"Finished executing LAVA job in the attempt #{attempt_no}")
452 raise MesaCIRetryError(
453 "Job failed after it exceeded the number of " f"{retry_count} retries.",
454 retry_count=retry_count,
458 def treat_mesa_job_name(args):
459 # Remove mesa job names with spaces, which breaks the lava-test-case command
460 args.mesa_job_name = args.mesa_job_name.split(" ")[0]
464 proxy = setup_lava_proxy()
466 job_definition = generate_lava_yaml(args)
469 print("LAVA job definition (YAML):")
470 print(hide_sensitive_data(job_definition))
471 job = LAVAJob(proxy, job_definition)
473 if errors := job.validate():
474 fatal_err(f"Error in LAVA job definition: {errors}")
475 print_log("LAVA job definition validated successfully")
477 if args.validate_only:
480 has_job_passed = retriable_follow_job(proxy, job_definition)
481 exit_code = 0 if has_job_passed else 1
486 parser = argparse.ArgumentParser("LAVA job submitter")
488 parser.add_argument("--pipeline-info")
489 parser.add_argument("--rootfs-url-prefix")
490 parser.add_argument("--kernel-url-prefix")
491 parser.add_argument("--build-url")
492 parser.add_argument("--job-rootfs-overlay-url")
493 parser.add_argument("--job-timeout", type=int)
494 parser.add_argument("--first-stage-init")
495 parser.add_argument("--ci-project-dir")
496 parser.add_argument("--device-type")
497 parser.add_argument("--dtb", nargs='?', default="")
498 parser.add_argument("--kernel-image-name")
499 parser.add_argument("--kernel-image-type", nargs='?', default="")
500 parser.add_argument("--boot-method")
501 parser.add_argument("--lava-tags", nargs='?', default="")
502 parser.add_argument("--jwt-file", type=pathlib.Path)
503 parser.add_argument("--validate-only", action='store_true')
504 parser.add_argument("--dump-yaml", action='store_true')
505 parser.add_argument("--visibility-group")
506 parser.add_argument("--mesa-job-name")
510 if __name__ == "__main__":
511 # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
512 # GitLab runner -> GitLab primary -> user, safe to say we don't need any
514 sys.stdout.reconfigure(line_buffering=True)
515 sys.stderr.reconfigure(line_buffering=True)
517 parser = create_parser()
519 parser.set_defaults(func=main)
520 args = parser.parse_args()
521 treat_mesa_job_name(args)