from concurrent.futures import ThreadPoolExecutor
from functools import partial
from itertools import chain
-from typing import Optional
+from typing import Literal, Optional
import gitlab
from colorama import Fore, Style
# target jobs
if target_jobs_regex.match(job.name):
if force_manual and job.status == "manual":
- enable_job(project, job, True)
+ enable_job(project, job, "target")
if stress and job.status in ["success", "failed"]:
stress_status_counter[job.name][job.status] += 1
- retry_job(project, job)
+ enable_job(project, job, "retry")
print_job_status(job, job.status not in target_statuses[job.id])
target_statuses[job.id] = job.status
# dependencies and cancelling the rest
if job.name in dependencies:
if job.status == "manual":
- enable_job(project, job, False)
+ enable_job(project, job, "dep")
elif job.status not in [
"canceled",
pretty_wait(REFRESH_WAIT_JOBS)
-def enable_job(project, job, target: bool) -> None:
- """enable manual job"""
+def enable_job(project, job, action_type: Literal["target", "dep", "retry"]) -> None:
+ """enable job"""
pjob = project.jobs.get(job.id, lazy=True)
- pjob.play()
- if target:
+
+ if job.status in ["success", "failed", "canceled"]:
+ pjob.retry()
+ else:
+ pjob.play()
+
+ if action_type == "target":
jtype = "🞋 "
+ elif action_type == "retry":
+ jtype = "↻"
else:
jtype = "(dependency)"
- print(Fore.MAGENTA + f"{jtype} job {job.name} manually enabled" + Style.RESET_ALL)
-
-def retry_job(project, job) -> None:
- """retry job"""
- pjob = project.jobs.get(job.id, lazy=True)
- pjob.retry()
- jtype = "↻"
print(Fore.MAGENTA + f"{jtype} job {job.name} manually enabled" + Style.RESET_ALL)