still broken
authorsalimfadhley <sal@stodge.org>
Fri, 11 Oct 2013 23:58:28 +0000 (00:58 +0100)
committersalimfadhley <sal@stodge.org>
Fri, 11 Oct 2013 23:58:28 +0000 (00:58 +0100)
1  2 
jenkinsapi/build.py
jenkinsapi/jenkins.py
jenkinsapi/job.py
jenkinsapi/utils/requester.py
jenkinsapi_tests/unittests/test_job.py
jenkinsapi_tests/unittests/test_requester.py

@@@ -30,8 -27,14 +30,14 @@@ class Build(JenkinsBase)
          assert type(buildno) == int
          self.buildno = buildno
          self.job = job
 -        JenkinsBase.__init__( self, url )
 +        JenkinsBase.__init__(self, url)
  
+     def _poll(self):
+         #For build's we need more information for downstream and upstream builds
+         #so we override the poll to get at the extra data for build objects
+         url = self.python_api_url(self.baseurl) + '?depth=2'
+         return self.get_data(url)
      def __str__(self):
          return self._data['fullDisplayName']
  
          return [x['mercurialNodeName'] for x in self._data['actions'] if 'mercurialNodeName' in x][0]
  
      def get_duration(self):
-         return self._data["duration"]
+         return datetime.timedelta(milliseconds=self._data["duration"])
  
 -    def get_artifacts( self ):
 +    def get_artifacts(self):
          for afinfo in self._data["artifacts"]:
 -            url = "%s/artifact/%s" % ( self.baseurl, afinfo["relativePath"] )
 -            af = Artifact( afinfo["fileName"], url, self )
 +            url = "%s/artifact/%s" % (self.baseurl, afinfo["relativePath"])
 +            af = Artifact(afinfo["fileName"], url, self)
              yield af
  
      def get_artifact_dict(self):
          Get the downstream job names for this build
          :return List of string or None
          """
-         downstream_jobs_names = self.job.get_downstream_job_names()
-         fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name]]" % self.python_api_url(self.baseurl))
++# <<<<<<< HEAD
++#         downstream_jobs_names = self.job.get_downstream_job_names()
++#         fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name]]" % self.python_api_url(self.baseurl))
++#         try:
++#             fingerprints = fingerprint_data['fingerprint'][0]
++#             return [
++#                 f['name']
++#                 for f in fingerprints['usage']
++#                 if f['name'] in downstream_jobs_names
++#             ]
++# =======
+         downstream_job_names = self.job.get_downstream_job_names()
+         downstream_names = []
          try:
-             fingerprints = fingerprint_data['fingerprint'][0]
-             return [
-                 f['name']
-                 for f in fingerprints['usage']
-                 if f['name'] in downstream_jobs_names
-             ]
+             fingerprints = self._data["fingerprint"]
+             for fingerprint in fingerprints :
+                 for job_usage in fingerprint['usage']:
+                     if job_usage['name'] in downstream_job_names:
+                         downstream_names.append(job_usage['name'])
+             return downstream_names
++# >>>>>>> unstable
          except (IndexError, KeyError):
-             return None
+             return []
  
      def get_downstream_builds(self):
          """
          Get the downstream builds for this build
          :return List of Build or None
          """
-         downstream_jobs_names = set(self.job.get_downstream_job_names())
-         msg = "%s?depth=2&tree=fingerprint[usage[name,ranges[ranges[end,start]]]]"
-         fingerprint_data = self.get_data(msg % self.python_api_url(self.baseurl))
++# <<<<<<< HEAD
++#         downstream_jobs_names = set(self.job.get_downstream_job_names())
++#         msg = "%s?depth=2&tree=fingerprint[usage[name,ranges[ranges[end,start]]]]"
++#         fingerprint_data = self.get_data(msg % self.python_api_url(self.baseurl))
++#         try:
++#             fingerprints = fingerprint_data['fingerprint'][0]
++#             return [
++#                 self.get_jenkins_obj().get_job(f['name']).get_build(f['ranges']['ranges'][0]['start'])
++#                 for f in fingerprints['usage']
++#                 if f['name'] in downstream_jobs_names
++#             ]
++# =======
+         downstream_job_names = self.get_downstream_job_names()
+         downstream_builds = []
          try:
-             fingerprints = fingerprint_data['fingerprint'][0]
-             return [
-                 self.get_jenkins_obj().get_job(f['name']).get_build(f['ranges']['ranges'][0]['start'])
-                 for f in fingerprints['usage']
-                 if f['name'] in downstream_jobs_names
-             ]
+             fingerprints = self._data["fingerprint"]
+             for fingerprint in fingerprints :
+                 for job_usage in fingerprint['usage']:
+                     if job_usage['name'] in downstream_job_names:
+                         job = self.get_jenkins_obj().get_job(job_usage['name'])
+                         for job_range in job_usage['ranges']['ranges']:
+                             for build_id in range(job_range['start'],
+                                                   job_range['end']):
+                                 downstream_builds.append(job.get_build(build_id))
+             return downstream_builds
++# >>>>>>> unstable
          except (IndexError, KeyError):
-             return None
+             return []
  
      def get_matrix_runs(self):
          """
@@@ -19,7 -15,7 +20,7 @@@ from jenkinsapi.queue import Queu
  from jenkinsapi.fingerprint import Fingerprint
  from jenkinsapi.jenkinsbase import JenkinsBase
  from jenkinsapi.utils.requester import Requester
- from jenkinsapi.custom_exceptions import UnknownJob, JenkinsAPIException
 -from jenkinsapi.exceptions import UnknownJob, JenkinsAPIException
++from jenkinsapi.custom_exceptions import UnknownJob
  
  log = logging.getLogger(__name__)
  
@@@ -110,54 -112,29 +117,28 @@@ class Jenkins(JenkinsBase)
          :param jobname: string
          :return: boolean
          """
-         return jobname in self
+         return jobname in self.jobs
  
 -    def create_job(self, jobname, config):
 +    def create_job(self, jobname, config_):
          """
          Create a job
          :param jobname: name of new job, str
          :param config: configuration of new job, xml
          :return: new Job obj
          """
-         if self.has_job(jobname):
-             return self[jobname]
-         params = {'name': jobname}
-         if isinstance(config_, unicode):
-             config_ = str(config_)
-         self.requester.post_xml_and_confirm_status(self.get_create_url(), data=config_, params=params)
-         self.poll()
-         if not self.has_job(jobname):
-             raise JenkinsAPIException('Cannot create job %s' % jobname)
-         return self[jobname]
 -        return self.jobs.create(jobname, config)
++        return self.jobs.create(jobname, config_)
  
      def copy_job(self, jobname, newjobname):
-         """
-         Copy a job
-         :param jobname: name of a exist job, str
-         :param newjobname: name of new job, str
-         :return: new Job obj
-         """
-         params = {'name': newjobname,
-                   'mode': 'copy',
-                   'from': jobname}
-         self.requester.post_and_confirm_status(
-             self.get_create_url(),
-             params=params,
-             data='')
-         self.poll()
-         return self[newjobname]
+         return self.jobs.copy(jobname, newjobname)
  
 -    def build_job(self, jobname, params={}):
 +    def build_job(self, jobname, params=None):
          """
          Invoke a build by job name
          :param jobname: name of exist job, str
          :param params: the job params, dict
          :return: none
          """
 -        self.jobs.build(jobname, params=params)
 -        return
 +        self[jobname].invoke(build_params=params or {})
-         return
  
      def delete_job(self, jobname):
          """
Simple merge
@@@ -1,9 -1,6 +1,10 @@@
 +"""
 +Module for jenkinsapi requester (which is a wrapper around python-requests)
 +"""
 +
  import requests
- from jenkinsapi.custom_exceptions import JenkinsAPIException
+ import urlparse
+ from jenkinsapi.exceptions import JenkinsAPIException
  # import logging
  
  # # these two lines enable debugging at httplib level (requests->urllib3->httplib)
@@@ -28,9 -25,9 +29,9 @@@ class Requester(object)
      This default class can handle simple authentication only.
      """
  
 -    VALID_STATUS_CODES = [200,]
 +    VALID_STATUS_CODES = [200, ]
  
-     def __init__(self, username=None, password=None, ssl_verify=True):
+     def __init__(self, username=None, password=None, ssl_verify=True, baseurl=None):
          if username:
              assert password, 'Cannot set a username without a password!'
  
              # It may seem odd, but some Jenkins operations require posting
              # an empty string.
              requestKwargs['data'] = data
 +
 +        if files:
 +            requestKwargs['files'] = files
 +
          return requestKwargs
  
 -            url = urlparse.urlunsplit([self.base_scheme, url_split.netloc, url_split.path, url_split.query,
 -                                       url_split.fragment])
+     def _update_url_scheme(self, url):
+         """
+         Updates scheme of given url to the one used in Jenkins baseurl.
+         """
+         if self.base_scheme and not url.startswith("%s://" % self.base_scheme):
+             url_split = urlparse.urlsplit(url)
++            url = urlparse.urlunsplit(
++                [
++                self.base_scheme,
++                url_split.netloc,
++                url_split.path,
++                url_split.query,
++                url_split.fragment
++                ]
++            )
+         return url
      def get_url(self, url, params=None, headers=None):
 -        requestKwargs = self.get_request_dict(url, params, None, headers)
 -        url = self._update_url_scheme(url)
 -        return requests.get(url, **requestKwargs)
 +        requestKwargs = self.get_request_dict(params=params, headers=headers)
-         return requests.get(url, **requestKwargs)
++        return requests.get(self._update_url_scheme(url), **requestKwargs)
  
 -    def post_url(self, url, params=None, data=None, headers=None):
 -        requestKwargs = self.get_request_dict(url, params, data, headers)
 -        url = self._update_url_scheme(url)
 -        return requests.post(url, **requestKwargs)
 +    def post_url(self, url, params=None, data=None, files=None, headers=None):
 +        requestKwargs = self.get_request_dict(params=params, data=data, files=files, headers=headers)
-         return requests.post(url, **requestKwargs)
++        return requests.post(self._update_url_scheme(url), **requestKwargs)
  
      def post_xml_and_confirm_status(self, url, params=None, data=None, valid=None):
          headers = {'Content-Type': 'text/xml'}
@@@ -201,15 -176,12 +201,15 @@@ class TestQueue(unittest.TestCase)
          response.status_code = 500
          _get.return_value = response
  
--        req = Requester('foo', 'bar')
++        req = Requester('foo', 'bar', baseurl='http://dummy')
          with self.assertRaises(JenkinsAPIException) as ae:
              req.get_and_confirm_status(
 -                            url='http://dummy',
 -                            params={'param': 'value'})
 +                url='http://dummy',
 +                params={'param': 'value'}
 +            )
  
          print ae.exception.message
 -        self.assertTrue(ae.exception.message=="Operation failed. url=None, headers=None, status=500, text=")
 +        self.assertTrue(ae.exception.message == "Operation failed. url=None, headers=None, status=500, text=")
  
 +if __name__ == "__main__":
 +    unittest.main()