+import time
from jenkinsapi.exceptions import UnknownQueueItem
class Invocation(object):
def __init__(self, job):
self.job = job
self.initial_builds = None
- self.initial_queue_item = None
+ self.queue_item = None
+ self.build_number = None
def __enter__(self):
self.job.poll()
self.initial_builds = set(self.job.get_build_dict().keys())
- try:
- self.initial_queue_item = self.job.get_queue_item()
- except UnknownQueueItem:
- pass
def __exit__(self, type, value, traceback):
"""
"""
self.job.poll()
newly_created_builds = set(self.job.get_build_dict().keys())
-
- queueItem = self.job.get_queue_item()
-
+ if newly_created_builds:
+ self.build_number = newly_created_builds.pop()
+ else:
+ try:
+ self.queue_item = self.job.get_queue_item()
+ self.build_number = self.job.get_next_build_number()
+ except UnknownQueueItem:
+ pass
def get_build_number(self):
"""
If this job is building or complete then provide it's build-number
"""
- return 1
+ return self.build_number
+
+ def get_build(self):
+ return self.job[self.get_build_number()]
+
+ def block_until_not_queued(self):
+ while True:
+ if not self.is_queued():
+ break
+ time.sleep(0.5)
+
def block(self, until='completed'):
"""
Block this item until a condition is met.
Setting until to 'running' blocks the item until it is running (i.e. it's no longer queued)
"""
+ assert until in ['completed', 'not_queued'], 'Unknown block condition: %s' % until
+ self.block_until_not_queued()
+ if until=='completed':
+ self.block_until_completed()
def stop(self):
"""
"""
Returns True if this item is on the queue
"""
+
+ import ipdb
+ ipdb.set_trace
+
return True
def is_running(self):
"""
Return the job associated with this queue item
"""
- return self.jenkins[self.task['name']]
\ No newline at end of file
+ return self.jenkins[self.task['name']]
+
+ def __repr__(self):
+ return "<%s.%s %s>" % (
+ self.__class__.__module__,
+ self.__class__.__name__,
+ str(self))
+
+ def __str__(self):
+ return "%s #%i" % (self.task['name'], self.id)
System tests for `jenkinsapi.jenkins` module.
'''
import unittest
+from jenkinsapi.build import Build
from jenkinsapi.invocation import Invocation
+from jenkinsapi_tests.systests.base import BaseSystemTest
from jenkinsapi_tests.systests.job_configs import LONG_RUNNING_JOB
from jenkinsapi_tests.test_utils.random_strings import random_string
-from jenkinsapi_tests.systests.base import BaseSystemTest
class TestInvocation(BaseSystemTest):
self.assertEquals(ii.get_build_number(), 1)
- def test_multiple_inocations(self):
- pass
-
+ def test_get_build_from_invocation(self):
+ job_name = 'create_%s' % random_string()
+ job = self.jenkins.create_job(job_name, LONG_RUNNING_JOB)
+ ii = job.invoke()
+ bn = ii.get_build_number()
+ self.assertIsInstance(bn, int)
+ ii.block(until='not_queued')
+ b = ii.get_build()
+ self.assertIsInstance(b, Build)
if __name__ == '__main__':