from jenkinsapi import exceptions
from jenkinsapi.mutable_jenkins_thing import MutableJenkinsThing
-from exceptions import NoBuildData, NotFound, NotInQueue
+from jenkinsapi.exceptions import NoBuildData, NotFound, NotInQueue
log = logging.getLogger(__name__)
return self._element_tree
def get_build_triggerurl(self, token=None, params=None):
- if token is None and params is None:
- extra = "build"
- elif params:
- if token:
- assert isinstance(token, str ), "token if provided should be a string."
- params['token'] = token
- extra = "buildWithParameters"
- else:
- assert isinstance(token, str ), "token if provided should be a string."
- params = dict()
- params['token'] = token
- extra = "build"
- buildurl = urlparse.urljoin( self.baseurl, extra )
- return buildurl, params
+ return "%s/build" % self.baseurl
def invoke(self, securitytoken=None, block=False, skip_if_running=False, invoke_pre_check_delay=3, invoke_block_delay=15, params=None, cause=None):
assert isinstance( invoke_pre_check_delay, (int, float) )
assert isinstance( invoke_block_delay, (int, float) )
assert isinstance( block, bool )
assert isinstance( skip_if_running, bool )
+
+ # Either copy the params dict or make a new one.
+ params = params and dict(params.items()) or {}
+
if self.is_queued():
- log.warn( "Will not request new build because %s is already queued" % self.id() )
- pass
+ log.warn( "Will not request new build because %s is already queued", self.name )
elif self.is_running():
if skip_if_running:
- log.warn( "Will not request new build because %s is already running" % self.id() )
- pass
+ log.warn( "Will not request new build because %s is already running", self.name )
else:
- log.warn("Will re-schedule %s even though it is already running" % self.id() )
+ log.warn("Will re-schedule %s even though it is already running", self.name )
original_build_no = self.get_last_buildnumber()
- log.info( "Attempting to start %s on %s" % ( self.id(), repr(self.get_jenkins_obj()) ) )
- url, params = self.get_build_triggerurl( securitytoken, params)
+ log.info( "Attempting to start %s on %s", self.name, repr(self.get_jenkins_obj()) )
+ url = self.get_build_triggerurl()
+
if cause:
params['cause'] = cause
- html_result = self.hit_url(url, params)
- assert len( html_result ) > 0
+
+ if securitytoken:
+ params['securitytoken'] = securitytoken
+
+ response = self.jenkins.requester.post_url(url, params, data='')
+
+ assert len( response.text ) > 0
if invoke_pre_check_delay > 0:
log.info("Waiting for %is to allow Jenkins to catch up" % invoke_pre_check_delay )
sleep( invoke_pre_check_delay )
if block:
total_wait = 0
while self.is_queued():
- log.info( "Waited %is for %s to begin..." % ( total_wait, self.id() ) )
+ log.info( "Waited %is for %s to begin..." % ( total_wait, self.name ))
sleep( invoke_block_delay )
total_wait += invoke_block_delay
if self.is_running():
assert self.get_last_buildnumber() > original_build_no, "Job does not appear to have run."
else:
if self.is_queued():
- log.info( "%s has been queued." % self.id() )
+ log.info( "%s has been queued." % self.name )
elif self.is_running():
- log.info( "%s is running." % self.id() )
+ log.info( "%s is running." % self.name )
elif original_build_no < self.get_last_buildnumber():
- log.info( "%s has completed." % self.id() )
+ log.info( "%s has completed." % self.name )
else:
raise AssertionError("The job did not schedule.")
def get_config(self):
'''Returns the config.xml from the job'''
- return self.hit_url("%(baseurl)s/config.xml" % self.__dict__)
+ return self.jenkins.requester.get_and_confirm_status("%(baseurl)s/config.xml" % self.__dict__)
def load_config(self):
self._config = self.get_config()