adding support to get downstream builds and jobs
authorVictor Garcia <victor@tuenti.com>
Tue, 26 Jun 2012 12:42:37 +0000 (14:42 +0200)
committerVictor Garcia <victor@tuenti.com>
Tue, 26 Jun 2012 12:42:37 +0000 (14:42 +0200)
jenkinsapi/build.py
jenkinsapi/job.py

index 40b7d79..8beca78 100644 (file)
@@ -99,6 +99,17 @@ class Build(JenkinsBase):
         except KeyError:
             return None
 
+    def get_upstream_build(self):
+        """
+        Get the upstream build if it exist, None otherwise
+        :return Build or None
+        """
+        upstream_job = self.get_upstream_job()
+        if upstream_job:
+            return upstream_job.get_build(self.get_upstream_build_number())
+        else:
+            return None
+
     def get_master_job_name(self):
         """
         Get the master job name if it exist, None otherwise
@@ -129,6 +140,68 @@ class Build(JenkinsBase):
         except KeyError:
             return None
 
+    def get_master_build(self):
+        """
+        Get the master build if it exist, None otherwise
+        :return Build or None
+        """
+        master_job = self.get_master_job()
+        if master_job:
+            return master_job.get_build(self.get_master_build_number())
+        else:
+            return None
+
+    def get_downstream_jobs(self):
+        """
+        Get the downstream jobs for this build
+        :return List of jobs or None
+        """
+        downstream_jobs_names = self.job.get_downstream_job_names()
+        fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name]]" % self.python_api_url(self.baseurl))
+        downstream_jobs = []
+        try:
+            fingerprints = fingerprint_data['fingerprint'][0]
+            for f in fingerprints['usage']:
+                if f['name'] in downstream_jobs_names:
+                    downstream_jobs.append(self.get_jenkins_obj().get_job(f['name']))
+            return downstream_jobs
+        except (IndexError, KeyError):
+            return None
+
+    def get_downstream_job_names(self):
+        """
+        Get the downstream job names for this build
+        :return List of string or None
+        """
+        downstream_jobs_names = self.job.get_downstream_job_names()
+        fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name]]" % self.python_api_url(self.baseurl))
+        downstream_names = []
+        try:
+            fingerprints = fingerprint_data['fingerprint'][0]
+            for f in fingerprints['usage']:
+                if f['name'] in downstream_jobs_names:
+                    downstream_names.append(f['name'])
+            return downstream_names
+        except (IndexError, KeyError):
+            return None
+
+    def get_downstream_builds(self):
+        """
+        Get the downstream builds for this build
+        :return List of Build or None
+        """
+        downstream_jobs_names = self.job.get_downstream_job_names()
+        fingerprint_data = self.get_data("%s?depth=2&tree=fingerprint[usage[name,ranges[ranges[end,start]]]]" % self.python_api_url(self.baseurl))
+        downstream_builds = []
+        try:
+            fingerprints = fingerprint_data['fingerprint'][0]
+            for f in fingerprints['usage']:
+                if f['name'] in downstream_jobs_names:
+                    downstream_builds.append(self.get_jenkins_obj().get_job(f['name']).get_build(f['ranges']['ranges'][0]['start']))
+            return downstream_builds
+        except (IndexError, KeyError):
+            return None
+
     def is_running( self ):
         """
         Return a bool if running.
index 029f1fc..2dfe279 100644 (file)
@@ -231,3 +231,55 @@ class Job(JenkinsBase):
     def update_config(self, config):
         '''Update the config.xml to the job'''
         return self.post_data("%(baseurl)s/config.xml" % self.__dict__, config)
+
+    def get_downstream_jobs(self):
+        """
+        Get all the possible downstream jobs
+        :return List of Job
+        """
+        downstream_jobs = []
+        try:
+            for j in self._data['downstreamProjects']:
+                downstream_jobs.append(self.get_jenkins_obj().get_job(j['name']))
+        except KeyError:
+            return []
+        return downstream_jobs
+
+    def get_downstream_job_names(self):
+        """
+        Get all the possible downstream job names
+        :return List of String
+        """
+        downstream_jobs = []
+        try:
+            for j in self._data['downstreamProjects']:
+                downstream_jobs.append(j['name'])
+        except KeyError:
+            return []
+        return downstream_jobs
+
+    def get_upstream_job_names(self):
+        """
+        Get all the possible upstream job names
+        :return List of String
+        """
+        upstream_jobs = []
+        try:
+            for j in self._data['upstreamProjects']:
+                upstream_jobs.append(j['name'])
+        except KeyError:
+            return []
+        return upstream_jobs
+
+    def get_upstream_jobs(self):
+        """
+        Get all the possible upstream jobs
+        :return List of Job
+        """
+        upstream_jobs = []
+        try:
+            for j in self._data['upstreamProjects']:
+                upstream_jobs.append(self.get_jenkins_obj().get_job(j['name']))
+        except KeyError:
+            return []
+        return upstream_jobs