assert type(buildno) == int
self.buildno = buildno
self.job = job
- JenkinsBase.__init__( self, url )
+ JenkinsBase.__init__(self, url)
+ def _poll(self):
+ #For build's we need more information for downstream and upstream builds
+ #so we override the poll to get at the extra data for build objects
+ url = self.python_api_url(self.baseurl) + '?depth=2'
+ return self.get_data(url)
+
def __str__(self):
return self._data['fullDisplayName']
return [x['mercurialNodeName'] for x in self._data['actions'] if 'mercurialNodeName' in x][0]
def get_duration(self):
- return self._data["duration"]
+ return datetime.timedelta(milliseconds=self._data["duration"])
- def get_artifacts( self ):
+ def get_artifacts(self):
for afinfo in self._data["artifacts"]:
- url = "%s/artifact/%s" % ( self.baseurl, afinfo["relativePath"] )
- af = Artifact( afinfo["fileName"], url, self )
+ url = "%s/artifact/%s" % (self.baseurl, afinfo["relativePath"])
+ af = Artifact(afinfo["fileName"], url, self)
yield af
def get_artifact_dict(self):
Get the downstream job names for this build
:return List of string or None
"""
- downstream_jobs_names = self.job.get_downstream_job_names()
- fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name]]" % self.python_api_url(self.baseurl))
++# <<<<<<< HEAD
++# downstream_jobs_names = self.job.get_downstream_job_names()
++# fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name]]" % self.python_api_url(self.baseurl))
++# try:
++# fingerprints = fingerprint_data['fingerprint'][0]
++# return [
++# f['name']
++# for f in fingerprints['usage']
++# if f['name'] in downstream_jobs_names
++# ]
++# =======
+ downstream_job_names = self.job.get_downstream_job_names()
+ downstream_names = []
try:
- fingerprints = fingerprint_data['fingerprint'][0]
- return [
- f['name']
- for f in fingerprints['usage']
- if f['name'] in downstream_jobs_names
- ]
+ fingerprints = self._data["fingerprint"]
+ for fingerprint in fingerprints :
+ for job_usage in fingerprint['usage']:
+ if job_usage['name'] in downstream_job_names:
+ downstream_names.append(job_usage['name'])
+ return downstream_names
++# >>>>>>> unstable
except (IndexError, KeyError):
- return None
+ return []
def get_downstream_builds(self):
"""
Get the downstream builds for this build
:return List of Build or None
"""
- downstream_jobs_names = set(self.job.get_downstream_job_names())
- msg = "%s?depth=2&tree=fingerprint[usage[name,ranges[ranges[end,start]]]]"
- fingerprint_data = self.get_data(msg % self.python_api_url(self.baseurl))
++# <<<<<<< HEAD
++# downstream_jobs_names = set(self.job.get_downstream_job_names())
++# msg = "%s?depth=2&tree=fingerprint[usage[name,ranges[ranges[end,start]]]]"
++# fingerprint_data = self.get_data(msg % self.python_api_url(self.baseurl))
++# try:
++# fingerprints = fingerprint_data['fingerprint'][0]
++# return [
++# self.get_jenkins_obj().get_job(f['name']).get_build(f['ranges']['ranges'][0]['start'])
++# for f in fingerprints['usage']
++# if f['name'] in downstream_jobs_names
++# ]
++# =======
+ downstream_job_names = self.get_downstream_job_names()
+ downstream_builds = []
try:
- fingerprints = fingerprint_data['fingerprint'][0]
- return [
- self.get_jenkins_obj().get_job(f['name']).get_build(f['ranges']['ranges'][0]['start'])
- for f in fingerprints['usage']
- if f['name'] in downstream_jobs_names
- ]
+ fingerprints = self._data["fingerprint"]
+ for fingerprint in fingerprints :
+ for job_usage in fingerprint['usage']:
+ if job_usage['name'] in downstream_job_names:
+ job = self.get_jenkins_obj().get_job(job_usage['name'])
+ for job_range in job_usage['ranges']['ranges']:
+ for build_id in range(job_range['start'],
+ job_range['end']):
+ downstream_builds.append(job.get_build(build_id))
+ return downstream_builds
++# >>>>>>> unstable
except (IndexError, KeyError):
- return None
+ return []
def get_matrix_runs(self):
"""
from jenkinsapi.fingerprint import Fingerprint
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi.utils.requester import Requester
- from jenkinsapi.custom_exceptions import UnknownJob, JenkinsAPIException
-from jenkinsapi.exceptions import UnknownJob, JenkinsAPIException
++from jenkinsapi.custom_exceptions import UnknownJob
log = logging.getLogger(__name__)
:param jobname: string
:return: boolean
"""
- return jobname in self
+ return jobname in self.jobs
- def create_job(self, jobname, config):
+ def create_job(self, jobname, config_):
"""
Create a job
:param jobname: name of new job, str
:param config: configuration of new job, xml
:return: new Job obj
"""
- if self.has_job(jobname):
- return self[jobname]
-
- params = {'name': jobname}
- if isinstance(config_, unicode):
- config_ = str(config_)
- self.requester.post_xml_and_confirm_status(self.get_create_url(), data=config_, params=params)
- self.poll()
- if not self.has_job(jobname):
- raise JenkinsAPIException('Cannot create job %s' % jobname)
- return self[jobname]
- return self.jobs.create(jobname, config)
++ return self.jobs.create(jobname, config_)
def copy_job(self, jobname, newjobname):
- """
- Copy a job
- :param jobname: name of a exist job, str
- :param newjobname: name of new job, str
- :return: new Job obj
- """
- params = {'name': newjobname,
- 'mode': 'copy',
- 'from': jobname}
-
- self.requester.post_and_confirm_status(
- self.get_create_url(),
- params=params,
- data='')
- self.poll()
- return self[newjobname]
+ return self.jobs.copy(jobname, newjobname)
- def build_job(self, jobname, params={}):
+ def build_job(self, jobname, params=None):
"""
Invoke a build by job name
:param jobname: name of exist job, str
:param params: the job params, dict
:return: none
"""
- self.jobs.build(jobname, params=params)
- return
+ self[jobname].invoke(build_params=params or {})
- return
def delete_job(self, jobname):
"""
+"""
+Module for jenkinsapi requester (which is a wrapper around python-requests)
+"""
+
import requests
- from jenkinsapi.custom_exceptions import JenkinsAPIException
+ import urlparse
+ from jenkinsapi.exceptions import JenkinsAPIException
# import logging
# # these two lines enable debugging at httplib level (requests->urllib3->httplib)
This default class can handle simple authentication only.
"""
- VALID_STATUS_CODES = [200,]
+ VALID_STATUS_CODES = [200, ]
- def __init__(self, username=None, password=None, ssl_verify=True):
+ def __init__(self, username=None, password=None, ssl_verify=True, baseurl=None):
if username:
assert password, 'Cannot set a username without a password!'
# It may seem odd, but some Jenkins operations require posting
# an empty string.
requestKwargs['data'] = data
+
+ if files:
+ requestKwargs['files'] = files
+
return requestKwargs
- url = urlparse.urlunsplit([self.base_scheme, url_split.netloc, url_split.path, url_split.query,
- url_split.fragment])
+ def _update_url_scheme(self, url):
+ """
+ Updates scheme of given url to the one used in Jenkins baseurl.
+ """
+ if self.base_scheme and not url.startswith("%s://" % self.base_scheme):
+ url_split = urlparse.urlsplit(url)
++ url = urlparse.urlunsplit(
++ [
++ self.base_scheme,
++ url_split.netloc,
++ url_split.path,
++ url_split.query,
++ url_split.fragment
++ ]
++ )
+ return url
+
def get_url(self, url, params=None, headers=None):
- requestKwargs = self.get_request_dict(url, params, None, headers)
- url = self._update_url_scheme(url)
- return requests.get(url, **requestKwargs)
+ requestKwargs = self.get_request_dict(params=params, headers=headers)
- return requests.get(url, **requestKwargs)
++ return requests.get(self._update_url_scheme(url), **requestKwargs)
- def post_url(self, url, params=None, data=None, headers=None):
- requestKwargs = self.get_request_dict(url, params, data, headers)
- url = self._update_url_scheme(url)
- return requests.post(url, **requestKwargs)
+ def post_url(self, url, params=None, data=None, files=None, headers=None):
+ requestKwargs = self.get_request_dict(params=params, data=data, files=files, headers=headers)
- return requests.post(url, **requestKwargs)
++ return requests.post(self._update_url_scheme(url), **requestKwargs)
def post_xml_and_confirm_status(self, url, params=None, data=None, valid=None):
headers = {'Content-Type': 'text/xml'}
response.status_code = 500
_get.return_value = response
-- req = Requester('foo', 'bar')
++ req = Requester('foo', 'bar', baseurl='http://dummy')
with self.assertRaises(JenkinsAPIException) as ae:
req.get_and_confirm_status(
- url='http://dummy',
- params={'param': 'value'})
+ url='http://dummy',
+ params={'param': 'value'}
+ )
print ae.exception.message
- self.assertTrue(ae.exception.message=="Operation failed. url=None, headers=None, status=500, text=")
+ self.assertTrue(ae.exception.message == "Operation failed. url=None, headers=None, status=500, text=")
+if __name__ == "__main__":
+ unittest.main()