assert isinstance(block, bool)
assert isinstance(skip_if_running, bool)
-
# Either copy the params dict or make a new one.
build_params = build_params and dict(
build_params.items()) or {} # Via POSTed JSON
params=params,
files=files,
)
-
+
queue_url = response.headers['location']
qi = QueueItem(queue_url, self.jenkins)
return qi
class Queue(JenkinsBase):
+
"""
Class that represents the Jenkins queue
"""
for item in self._data["items"]:
if item['task']['name'] == job_name:
yield QueueItem(self.get_queue_item_url(item), jenkins_obj=self.jenkins)
-
+
def get_queue_items_for_job(self, job_name):
return list(self._get_queue_items_for_job(job_name))
-
+
def get_queue_item_url(self, item):
return "%s/item/%i" % (self.baseurl, item["id"])
-
+
def delete_item(self, queue_item):
self.delete_item_by_id(queue_item.id)
class QueueItem(JenkinsBase):
+
"""An individual item in the queue
"""
def __init__(self, baseurl, jenkins_obj):
self.jenkins = jenkins_obj
JenkinsBase.__init__(self, baseurl)
-
+
@property
def id(self):
return self._data['id']
-
def get_jenkins_obj(self):
return self.jenkins
def __str__(self):
return "%s Queue #%i" % (self._data['task']['name'], self._data['id'])
-
+
def get_build(self):
build_number = self.get_build_number()
job_name = self.get_job_name()
return self.jenkins[job_name][build_number]
-
-
+
def block_until_complete(self, delay=15):
build = self.block_until_building(delay)
return build.block_until_complete(delay=delay)
-
-
+
def block_until_building(self, delay=5):
while True:
try:
except NotBuiltYet:
time.sleep(delay)
continue
-
-
+
def is_running(self):
"""Return True if this queued item is running.
"""
return self.get_build().is_running()
except NotBuiltYet:
return False
-
+
def get_build_number(self):
try:
return self._data['executable']['number']
except KeyError:
raise NotBuiltYet()
-
+
def get_job_name(self):
try:
return self._data['task']['name']
except KeyError:
raise NotBuiltYet()
-
\ No newline at end of file
import unittest
from jenkinsapi.job import Job
from jenkinsapi.plugin import Plugin
-from jenkinsapi.invocation import Invocation
+from jenkinsapi.queue import QueueItem
from jenkinsapi_tests.systests.base import BaseSystemTest
from jenkinsapi_tests.systests.job_configs import EMPTY_JOB
from jenkinsapi_tests.test_utils.random_strings import random_string
job_name = 'create_%s' % random_string()
job = self.jenkins.create_job(job_name, EMPTY_JOB)
ii = job.invoke()
- self.assertIsInstance(ii, Invocation)
+ self.assertIsInstance(ii, QueueItem)
def test_get_jobs_list(self):
job1_name = 'first_%s' % random_string()