-Cloned copy of http://pyjenkinsci.googlecode.com/
-This file was modified by PyCharm 2.0.1 for binding GitHub repository
+The original project is pyjenkinsci on googlecode: http://pyjenkinsci.googlecode.com/
+
+A number of refactoring has been done:
+ * uniform usage of jenkins instead of mixed jenkins/hudson
+ * import fixup, all imports are relative to packagename now
+ * downloading artifacts is simplified
+
+New features:
+ * username/password auth support for jenkins instances with auth turned on
+ * Ability to add/remove/query jenkins slaves
+ * Ability to retrieve/find builds by subversion revision
+
+Deletion:
+
+I've completely removed the test methods and the small amount of test cases
+All tests were tied to a specific local environment.
+
+Thanx to salimfadhley@gmail.com for the initial version which we based this on
+Thanx to ruslan@hyves.nl for the subversion revision patches
+
+Current code lives on github: https://github.com/ramonvanalteren/jenkinsapi.git
+
--- /dev/null
+TODO:
+
+* Add a testsuite (preferably nose or py.test) which doesn't rely on a local jenkins setup (or instantiates one during test)
+* Clean up the fingerprint code
+* Clean up the resultset and results code
+* Make all objects inherit off jenkins_base where that makes sense
+* Add ability to add/modify/delete jobs
+* Add ability to query jenkins for plugin data
+
--- /dev/null
+from jenkinsapi.artifact import Artifact
+from jenkinsapi import constants
+from jenkinsapi.jenkins import Jenkins
+from jenkinsapi.exceptions import ArtifactsMissing, TimeOut, BadURL
+from urllib2 import urlparse
+
+import os
+import time
+import logging
+
+log = logging.getLogger(__name__)
+
+def get_latest_test_results( jenkinsurl, jobname ):
+ """
+ A convenience function to fetch down the very latest test results from a jenkins job.
+ """
+ latestbuild = get_latest_build( jenkinsurl, jobname )
+ res = latestbuild.get_resultset()
+ return res
+
+def get_latest_build(jenkinsurl, jobname):
+ """
+ A convenience function to fetch down the very latest test results from a jenkins job.
+ """
+ jenkinsci = Jenkins(jenkinsurl)
+ job = jenkinsci[jobname]
+ return job.get_last_build()
+
+def get_latest_complete_build(jenkinsurl, jobname):
+ """
+ A convenience function to fetch down the very latest test results from a jenkins job.
+ """
+ jenkinsci = Jenkins(jenkinsurl)
+ job = jenkinsci[jobname]
+ return job.get_last_completed_build()
+
+def get_artifacts( jenkinsurl, jobid=None, build_no=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None ):
+ """
+ Find all the artifacts for the latest build of a job.
+ """
+ jenkinsci = Jenkins(jenkinsurl, proxyhost, proxyport, proxyuser, proxypass)
+ job = jenkinsci[jobid]
+ if build_no:
+ build = job.get_build( build_no )
+ else:
+ build = job.get_last_good_build()
+ artifacts = dict((artifact.filename, artifact) for artifact in build.get_artifacts())
+ log.info("Found %i artifacts in '%s'" % ( len(artifacts.keys() ), build_no ))
+ return artifacts
+
+def search_artifacts(jenkinsurl, jobid, artifact_ids=None, same_build=True):
+ """
+ Search the entire history of a jenkins job for a list of artifact names. If same_build
+ is true then ensure that all artifacts come from the same build of the job
+ """
+ if len(artifact_ids) == 0 or artifact_ids is None:
+ return []
+ assert same_build, "same_build==False not supported yet"
+ jenkinsci = Jenkins( jenkinsurl )
+ job = jenkinsci[ jobid ]
+ build_ids = job.get_build_ids()
+ for build_id in build_ids:
+ build = job.get_build( build_id )
+ artifacts = build.get_artifact_dict()
+ if set( artifact_ids ).issubset( set( artifacts.keys() ) ):
+ return dict( ( a,artifacts[a] ) for a in artifact_ids )
+ missing_artifacts = set( artifact_ids ) - set( artifacts.keys() )
+ log.debug("Artifacts %s missing from %s #%i" % ( ", ".join( missing_artifacts ), jobid, build_id ))
+ #noinspection PyUnboundLocalVariable
+ raise ArtifactsMissing( missing_artifacts )
+
+def grab_artifact(jenkinsurl, jobid, artifactid, targetdir):
+ """
+ Convenience method to find the latest good version of an artifact and save it
+ to a target directory. Directory is made automatically if not exists.
+ """
+ artifacts = get_artifacts( jenkinsurl, jobid )
+ artifact = artifacts[ artifactid ]
+ if not os.path.exists( targetdir ):
+ os.makedirs( targetdir )
+ artifact.savetodir( targetdir)
+
+def block_until_complete(jenkinsurl, jobs, maxwait=12000, interval=30, raise_on_timeout=True):
+ """
+ Wait until all of the jobs in the list are complete.
+ """
+ assert maxwait > 0
+ assert maxwait > interval
+ assert interval > 0
+
+ obj_jenkins = Jenkins(jenkinsurl)
+ obj_jobs = [obj_jenkins[jid] for jid in jobs]
+ for time_left in xrange(maxwait, 0, -interval):
+ still_running = [j for j in obj_jobs if j.is_queued_or_running()]
+ if not still_running:
+ return
+ str_still_running = ", ".join('"%s"' % str(a) for a in still_running)
+ log.warn( "Waiting for jobs %s to complete. Will wait another %is" % (str_still_running, time_left ))
+ time.sleep(interval)
+ if raise_on_timeout:
+ #noinspection PyUnboundLocalVariable
+ raise TimeOut("Waited too long for these jobs to complete: %s" % str_still_running)
+
+def get_view_from_url(url):
+ """
+ Factory method
+ """
+ matched = constants.RE_SPLIT_VIEW_URL.search(url)
+ if not matched:
+ raise BadURL("Cannot parse URL %s" % url)
+ jenkinsurl, view_name = matched.groups()
+ jenkinsci = Jenkins(jenkinsurl)
+ return jenkinsci.get_view(view_name)
+
+def install_artifacts(artifacts, dirstruct, installdir, basestaticurl):
+ """
+ Install the artifacts.
+ """
+ assert basestaticurl.endswith("/"), "Basestaticurl should end with /"
+ installed = []
+ for reldir, artifactnames in dirstruct.items():
+ destdir = os.path.join(installdir, reldir)
+ if not os.path.exists(destdir):
+ log.warn("Making install directory %s" % destdir)
+ os.makedirs(destdir)
+ else:
+ assert os.path.isdir(destdir)
+ for artifactname in artifactnames:
+ destpath = os.path.abspath(os.path.join( destdir, artifactname))
+ if artifactname in artifacts.keys():
+ # The artifact must be loaded from jenkins
+ theartifact = artifacts[artifactname]
+ else:
+ # It's probably a static file, we can get it from the static collection
+ staticurl = urlparse.urljoin(basestaticurl, artifactname)
+ theartifact = Artifact(artifactname, staticurl)
+ theartifact.save(destpath)
+ installed.append(destpath)
+ return installed
--- /dev/null
+from __future__ import with_statement
+import urllib
+import os
+import logging
+import hashlib
+
+from jenkinsapi.exceptions import ArtifactBroken
+from jenkinsapi.fingerprint import Fingerprint
+
+log = logging.getLogger( __name__ )
+
+class Artifact(object):
+
+ def __init__( self, filename, url, build=None ):
+ self.filename = filename
+ self.url = url
+ self.build = build
+
+ def save( self, fspath ):
+ """
+ Save the artifact to an explicit path. The containing directory must exist.
+ Returns a reference to the file which has just been writen to.
+
+ :param fspath: full pathname including the filename, str
+ :return: filepath
+ """
+ log.info( "Saving artifact @ %s to %s" % (self.url, fspath) )
+ if not fspath.endswith( self.filename ):
+ log.warn( "Attempt to change the filename of artifact %s on save." % self.filename )
+ if os.path.exists(fspath):
+ if self.build:
+ try:
+ if self._verify_download(fspath):
+ log.info( "Local copy of %s is already up to date." % self.filename)
+ return fspath
+ except ArtifactBroken:
+ log.info("Jenkins artifact could not be identified.")
+ else:
+ log.info("This file did not originate from Jenkins, so cannot check.")
+ else:
+ log.info("Local file is missing, downloading new.")
+ filename = self._do_download(fspath)
+ try:
+ self._verify_download(filename)
+ except ArtifactBroken:
+ log.warning("fingerprint of the downloaded artifact could not be verified")
+ return filename
+
+ def _do_download(self, fspath):
+ filename, headers = urllib.urlretrieve(self.url, filename=fspath)
+ return filename
+
+ def _verify_download(self, fspath):
+ local_md5 = self._md5sum(fspath)
+ fp = Fingerprint(self.build.job.jenkins.baseurl, local_md5, self.build.job.jenkins)
+ return fp.validate_for_build(fspath, self.build.job, self.build)
+
+ def _md5sum(self, fspath, chunksize=2**20):
+ md5 = hashlib.md5()
+ try:
+ with open(fspath,'rb') as f:
+ for chunk in iter(lambda: f.read(chunksize), ''):
+ md5.update(chunk)
+ except:
+ raise
+ return md5.hexdigest()
+
+ def savetodir( self, dirpath ):
+ """
+ Save the artifact to a folder. The containing directory must be exist, but use the artifact's
+ default filename.
+ """
+ assert os.path.exists( dirpath )
+ assert os.path.isdir( dirpath )
+ outputfilepath = os.path.join( dirpath, self.filename )
+ self.save( outputfilepath )
+
+
+ def __repr__( self ):
+ return """<%s.%s %s>""" % ( self.__class__.__module__,
+ self.__class__.__name__,
+ self.url )
--- /dev/null
+from jenkinsapi.artifact import Artifact
+from jenkinsapi import config
+from jenkinsapi.jenkinsbase import JenkinsBase
+from jenkinsapi.exceptions import NoResults, FailedNoResults
+from jenkinsapi.constants import STATUS_FAIL, STATUS_ABORTED, RESULTSTATUS_FAILURE
+from jenkinsapi.result_set import ResultSet
+
+from datetime import time
+import logging
+
+log = logging.getLogger(__name__)
+
+class Build(JenkinsBase):
+ """
+ Represents a jenkins build, executed in context of a job.
+ """
+
+ STR_TOTALCOUNT = "totalCount"
+ STR_TPL_NOTESTS_ERR = "%s has status %s, and does not have any test results"
+
+ def __init__( self, url, buildno, job ):
+ assert type(buildno) == int
+ self.buildno = buildno
+ self.job = job
+ JenkinsBase.__init__( self, url )
+
+ def __str__(self):
+ return self._data['fullDisplayName']
+
+ def id(self):
+ return self._data["number"]
+
+ def get_status(self):
+ return self._data["result"]
+
+ def get_revision(self):
+ for set in self._data["changeSet"]["revisions"]:
+ return set["revision"]
+
+ def get_duration(self):
+ return self._data["duration"]
+
+ def get_artifacts( self ):
+ for afinfo in self._data["artifacts"]:
+ url = "%sartifact/%s" % ( self.baseurl, afinfo["relativePath"] )
+ af = Artifact( afinfo["fileName"], url, self )
+ yield af
+ del af, url
+
+ def get_artifact_dict(self):
+ return dict( (a.filename, a) for a in self.get_artifacts() )
+
+ def is_running( self ):
+ """
+ Return a bool if running.
+ """
+ self.poll()
+ return self._data["building"]
+
+ def is_good( self ):
+ """
+ Return a bool, true if the build was good.
+ If the build is still running, return False.
+ """
+ return ( not self.is_running() ) and self._data["result"] == 'SUCCESS'
+
+ def block_until_complete(self, delay=15):
+ assert isinstance( delay, int )
+ count = 0
+ while self.is_running():
+ total_wait = delay * count
+ log.info("Waited %is for %s #%s to complete" % ( total_wait, self.job.id(), self.id() ) )
+ time.sleep( delay )
+ count += 1
+
+ def get_jenkins_obj(self):
+ return self.job.get_jenkins_obj()
+
+ def get_result_url(self):
+ """
+ Return the URL for the object which provides the job's result summary.
+ """
+ url_tpl = r"%stestReport/%s"
+ return url_tpl % ( self._data["url"] , config.JENKINS_API )
+
+ def get_resultset(self):
+ """
+ Obtain detailed results for this build.
+ """
+ result_url = self.get_result_url()
+ if self.STR_TOTALCOUNT not in self.get_actions():
+ raise NoResults( "%s does not have any published results" % str(self) )
+ buildstatus = self.get_status()
+ if buildstatus in [ STATUS_FAIL, RESULTSTATUS_FAILURE, STATUS_ABORTED ]:
+ raise FailedNoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) )
+ if not self.get_actions()[self.STR_TOTALCOUNT]:
+ raise NoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) )
+ obj_results = ResultSet( result_url, build=self )
+ return obj_results
+
+ def has_resultset(self):
+ """
+ Return a boolean, true if a result set is available. false if not.
+ """
+ return self.STR_TOTALCOUNT in self.get_actions()
+
+ def get_actions(self):
+ all_actions = {}
+ for dct_action in self._data["actions"]:
+ all_actions.update( dct_action )
+ return all_actions
+
--- /dev/null
+import os
+import sys
+import logging
+import optparse
+from jenkinsapi import jenkins
+
+log = logging.getLogger(__name__)
+
+class jenkins_invoke(object):
+ @classmethod
+ def mkparser(cls):
+ parser = optparse.OptionParser()
+ DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )
+ parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."
+ parser.add_option("-J", "--jenkinsbase", dest="baseurl",
+ help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,
+ type="str", default=DEFAULT_BASEURL)
+ parser.add_option('--username', '-u', dest='username',
+ help="Username for jenkins authentification", type='str', default=None)
+ parser.add_option('--password', '-p', dest='password',
+ help="password for jenkins user auth", type='str', default=None)
+ parser.add_option("-b", "--block", dest="block", action="store_true", default=False,
+ help="Block until each of the jobs is complete.")
+ parser.add_option("-t", "--token", dest="token",help="Optional security token.",
+ default=None)
+ return parser
+
+ @classmethod
+ def main(cls):
+ parser = cls.mkparser()
+ options, args = parser.parse_args()
+ try:
+ assert len(args) > 0, "Need to specify at least one job name"
+ except AssertionError, e:
+ log.critical(e[0])
+ parser.print_help()
+ sys.exit(1)
+ invoker = cls(options, args)
+ invoker()
+
+ def __init__(self, options, jobs):
+ self.options = options
+ self.jobs = jobs
+ self.api = self._get_api(baseurl=options.baseurl, username=options.username, password=options.password)
+
+ def _get_api(self, baseurl, username, password):
+ return jenkins.Jenkins(baseurl, username, password)
+
+ def __call__(self):
+ for job in self.jobs:
+ self.invokejob(job, block=self.options.block, token=self.options.token)
+
+ def invokejob(self, jobname, block, token):
+ assert type(block) == bool
+ assert type(jobname) == str
+ assert token is None or isinstance(token, str)
+ job = self.api.get_job(jobname)
+ job.invoke(securitytoken=token, block=block)
+
+
+def main( ):
+ logging.basicConfig()
+ logging.getLogger("").setLevel(logging.INFO)
+ jenkins_invoke.main()
\ No newline at end of file
--- /dev/null
+JENKINS_API = r"api/python/"
+LOAD_TIMEOUT = 30
+LOAD_ATTEMPTS = 5
\ No newline at end of file
--- /dev/null
+import re
+
+STATUS_FAIL = "FAIL"
+STATUS_ERROR = "ERROR"
+STATUS_ABORTED = "ABORTED"
+STATUS_REGRESSION = "REGRESSION"
+
+STATUS_FIXED = "FIXED"
+STATUS_PASSED = "PASSED"
+
+RESULTSTATUS_FAILURE = "FAILURE"
+RESULTSTATUS_FAILED = "FAILED"
+
+STR_RE_SPLIT_VIEW = "(.*)/view/([^/]*)/?"
+RE_SPLIT_VIEW_URL = re.compile( STR_RE_SPLIT_VIEW )
--- /dev/null
+class ArtifactsMissing(Exception):
+ """
+ Cannot find a build with all of the required artifacts.
+ """
+
+class UnknownJob( KeyError ):
+ """
+ Jenkins does not recognize the job requested.
+ """
+
+class ArtifactBroken(Exception):
+ """
+ An artifact is broken, wrong
+ """
+
+class TimeOut( Exception ):
+ """
+ Some jobs have taken too long to complete.
+ """
+
+class WillNotBuild(Exception):
+ """
+ Cannot trigger a new build.
+ """
+
+class NoBuildData(Exception):
+ """
+ A job has no build data.
+ """
+
+class NoResults(Exception):
+ """
+ A build did not publish any results.
+ """
+
+class FailedNoResults(NoResults):
+ """
+ A build did not publish any results because it failed
+ """
+
+class BadURL(ValueError):
+ """
+ A URL appears to be broken
+ """
+
+class NotFound(Exception):
+ """
+ Resource cannot be found
+ """
--- /dev/null
+from jenkinsapi.jenkinsbase import JenkinsBase
+from jenkinsapi.exceptions import ArtifactBroken
+
+import urllib2
+import re
+
+import logging
+
+log = logging.getLogger( __name__ )
+
+class Fingerprint(JenkinsBase):
+ """
+ Represents a jenkins fingerprint on a single artifact file ??
+ """
+ RE_MD5 = re.compile("^([0-9a-z]{32})$")
+
+ def __init__(self, baseurl, id, jenkins_obj):
+ logging.basicConfig()
+ self.jenkins_obj = jenkins_obj
+ assert self.RE_MD5.search( id ), "%s does not look like a valid id" % id
+ url = "%s/fingerprint/%s/" % ( baseurl, id )
+ JenkinsBase.__init__( self, url, poll=False )
+ self.id = id
+
+ def get_jenkins_obj(self):
+ return self.jenkins_obj
+
+ def __str__(self):
+ return self.id
+
+ def valid(self):
+ """
+ Return True / False if valid
+ """
+ try:
+ self.poll()
+ except urllib2.HTTPError:
+ return False
+ return True
+
+ def validate_for_build(self, filename, job, build):
+ if not self.valid():
+ log.info("Unknown to jenkins.")
+ return False
+ if not self._data["original"] is None:
+ if self._data["original"]["name"] == job:
+ if self._data["original"]["number"] == build:
+ return True
+ if self._data["fileName"] != filename:
+ log.info("Filename from jenkins (%s) did not match provided (%s)" % ( self._data["fileName"], filename ) )
+ return False
+ for usage_item in self._data["usage"]:
+ if usage_item["name"] == job:
+ for range in usage_item["ranges"]["ranges"]:
+ if range["start"] <= build <= range["end"]:
+ log.info("This artifact was generated by %s between build %i and %i" % ( job, range["start"], range["end"] ) )
+ return True
+ return False
+
+ def validate(self):
+ try:
+ assert self.valid()
+ except AssertionError:
+ raise ArtifactBroken( "Artifact %s seems to be broken, check %s" % ( self.id, self.baseurl ) )
+ except urllib2.HTTPError:
+ raise ArtifactBroken( "Unable to validate artifact id %s using %s" % ( self.id, self.baseurl ) )
+ return True
+
+ def get_info( self ):
+ """
+ Returns a tuple of build-name, build# and artifiact filename for a good build.
+ """
+ self.poll()
+ return self._data["original"]["name"], self._data["original"]["number"], self._data["fileName"]
--- /dev/null
+from jenkinsapi.jenkinsbase import JenkinsBase
+from jenkinsapi.fingerprint import Fingerprint
+from jenkinsapi.job import Job
+from jenkinsapi.view import View
+from jenkinsapi.node import Node
+from jenkinsapi.exceptions import UnknownJob
+from utils.urlopener import mkurlopener
+import logging
+import time
+import urllib2
+import urllib
+try:
+ import json
+except ImportError:
+ import simplejson as json
+
+log = logging.getLogger(__name__)
+
+class Jenkins(JenkinsBase):
+ """
+ Represents a jenkins environment.
+ """
+ def __init__(self, baseurl, username=None, password=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None):
+ """
+
+ :param baseurl: baseurl for jenkins instance including port, str
+ :param username: username for jenkins auth, str
+ :param password: password for jenkins auth, str
+ :param proxyhost: proxyhostname, str
+ :param proxyport: proxyport, int
+ :param proxyuser: proxyusername for proxy auth, str
+ :param proxypass: proxypassword for proxyauth, str
+ :return: a Jenkins obj
+ """
+ self.username = username
+ self.password = password
+ self.proxyhost = proxyhost
+ self.proxyport = proxyport
+ self.proxyuser = proxyuser
+ self.proxypass = proxypass
+ JenkinsBase.__init__( self, baseurl )
+
+ def get_proxy_auth(self):
+ return self.proxyhost, self.proxyport, self.proxyuser, self.proxypass
+
+ def get_jenkins_auth(self):
+ return self.username, self.password, self.baseurl
+
+ def get_auth(self):
+ auth_args = []
+ auth_args.extend(self.get_jenkins_auth())
+ auth_args.extend(self.get_proxy_auth())
+ log.debug("args: %s" % auth_args)
+ return auth_args
+
+ def get_opener( self ):
+ return mkurlopener(*self.get_auth())
+
+ def validate_fingerprint( self, id ):
+ obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
+ obj_fingerprint.validate()
+ log.info("Jenkins says %s is valid" % id)
+
+ def get_artifact_data(self, id):
+ obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
+ obj_fingerprint.validate()
+ return obj_fingerprint.get_info()
+
+ def validate_fingerprint_for_build(self, digest, filename, job, build ):
+ obj_fingerprint = Fingerprint( self.baseurl, digest, jenkins_obj=self )
+ return obj_fingerprint.validate_for_build( filename, job, build )
+
+ def get_jenkins_obj(self):
+ return self
+
+ def get_jobs(self):
+ """
+ Fetch all the build-names on this Jenkins server.
+ """
+ for info in self._data["jobs"]:
+ yield info["name"], Job( info["url"], info["name"], jenkins_obj=self)
+
+ def get_job(self, jobname):
+ """
+ Get a job by name
+ :param jobname: name of the job, str
+ :return: Job obj
+ """
+ return self[jobname]
+
+ def iteritems(self):
+ return self.get_jobs()
+
+ def iterkeys(self):
+ for info in self._data["jobs"]:
+ yield info["name"]
+
+ def keys(self):
+ return [ a for a in self.iterkeys() ]
+
+ def __str__(self):
+ return "Jenkins server at %s" % self.baseurl
+
+ def _get_views( self ):
+ if not self._data.has_key( "views" ):
+ pass
+ else:
+ for viewdict in self._data["views"]:
+ yield viewdict["name"], viewdict["url"]
+
+ def get_view_dict(self):
+ return dict( self._get_views() )
+
+ def get_view_url( self, str_view_name ):
+ try:
+ view_dict = self.get_view_dict()
+ return view_dict[ str_view_name ]
+ except KeyError:
+ #noinspection PyUnboundLocalVariable
+ all_views = ", ".join( view_dict.keys() )
+ raise KeyError("View %s is not known - available: %s" % ( str_view_name, all_views ) )
+
+ def get_view(self, str_view_name ):
+ view_url = self.get_view_url( str_view_name )
+ view_api_url = self.python_api_url( view_url )
+ return View(view_api_url , str_view_name, jenkins_obj=self)
+
+ def __getitem__(self, jobname):
+ """
+ Get a job by name
+ :param jobname: name of job, str
+ :return: Job obj
+ """
+ for name, job in self.get_jobs():
+ if name == jobname:
+ return job
+ raise UnknownJob(jobname)
+
+ def get_node_dict(self):
+ """Get registered slave nodes on this instance"""
+ url = self.python_api_url(self.get_node_url())
+ node_dict = dict(self.get_data(url))
+ return dict(
+ (node['displayName'], self.python_api_url(self.get_node_url(node['displayName'])))
+ for node in node_dict['computer'])
+
+ def get_node(self, nodename):
+ """Get a node object for a specific node"""
+ node_url = self.python_api_url(self.get_node_url(nodename))
+ return Node(node_url, nodename, jenkins_obj=self)
+
+ def get_node_url(self, nodename=""):
+ """Return the url for nodes"""
+ url = "%(baseurl)s/computer/%(nodename)s" % {'baseurl': self.baseurl, 'nodename': nodename}
+ return url
+
+ def has_node(self, nodename):
+ """
+ Does a node by the name specified exist
+ :param nodename: string, hostname
+ :return: boolean
+ """
+ return nodename in self.get_node_dict()
+
+ def delete_node(self, nodename):
+ """
+ Remove a node from the managed slave list
+ Please note that you cannot remove the master node
+
+ :param nodename: string holding a hostname
+ :return: None
+ """
+ assert self.has_node(nodename), "This node: %s is not registered as a slave" % nodename
+ assert nodename != "master", "you cannot delete the master node"
+ url = "%s/doDelete" % self.get_node_url(nodename)
+ fn_urlopen = self.get_jenkins_obj().get_opener()
+ try:
+ fn_urlopen(url).read()
+ except urllib2.HTTPError, e:
+ log.debug("Error reading %s" % url)
+ log.exception(e)
+ raise
+ return not self.has_node(nodename)
+
+ def create_node(self, name, num_executors=2, node_description=None,
+ remote_fs='/var/lib/jenkins', labels=None, exclusive=False):
+ """
+ Create a new slave node by name.
+
+ :param name: fqdn of slave, str
+ :param num_executors: number of executors, int
+ :param node_description: a freetext field describing the node
+ :param remote_fs: jenkins path, str
+ :param labels: labels to associate with slave, str
+ :param exclusive: tied to specific job, boolean
+ :return: node obj
+ """
+ NODE_TYPE = 'jenkins.slaves.DumbSlave$DescriptorImpl'
+ MODE = 'NORMAL'
+ if self.has_node(name):
+ return Node(nodename=name, baseurl=self.get_node_url(nodename=name), jenkins_obj=self)
+ if exclusive:
+ MODE = 'EXCLUSIVE'
+ params = {
+ 'name' : name,
+ 'type' : NODE_TYPE,
+ 'json' : json.dumps ({
+ 'name' : name,
+ 'nodeDescription' : node_description,
+ 'numExecutors' : num_executors,
+ 'remoteFS' : remote_fs,
+ 'labelString' : labels,
+ 'mode' : MODE,
+ 'type' : NODE_TYPE,
+ 'retentionStrategy' : { 'stapler-class' : 'jenkins.slaves.RetentionStrategy$Always' },
+ 'nodeProperties' : { 'stapler-class-bag' : 'true' },
+ 'launcher' : { 'stapler-class' : 'jenkins.slaves.JNLPLauncher' }
+ })
+ }
+ url = "%(nodeurl)s/doCreateItem?%(params)s" % {
+ 'nodeurl': self.get_node_url(),
+ 'params': urllib.urlencode(params)
+ }
+ print url
+ fn_urlopen = self.get_jenkins_obj().get_opener()
+ try:
+ fn_urlopen(url).read()
+ except urllib2.HTTPError, e:
+ log.debug("Error reading %s" % url)
+ log.exception(e)
+ raise
+ return Node(nodename=name, baseurl=self.get_node_url(nodename=name), jenkins_obj=self)
--- /dev/null
+import urllib2
+import logging
+import pprint
+from jenkinsapi import config
+from jenkinsapi.utils.retry import retry_function
+
+log = logging.getLogger( __name__ )
+
+class JenkinsBase(object):
+ """
+ This appears to be the base object that all other jenkins objects are inherited from
+ """
+ RETRY_ATTEMPTS = 5
+
+ def __repr__( self ):
+ return """<%s.%s %s>""" % ( self.__class__.__module__,
+ self.__class__.__name__,
+ str( self ) )
+
+ def print_data(self):
+ pprint.pprint( self._data )
+
+ def __str__(self):
+ raise NotImplemented
+
+ def __init__( self, baseurl, poll=True ):
+ """
+ Initialize a jenkins connection
+ """
+ self.baseurl = baseurl
+ if poll:
+ try:
+ self.poll()
+ except urllib2.HTTPError, hte:
+ log.exception(hte)
+ log.warn( "Failed to conenct to %s" % baseurl )
+ raise
+
+ def poll(self):
+ self._data = self._poll()
+
+ def _poll(self):
+ url = self.python_api_url( self.baseurl )
+ return retry_function( self.RETRY_ATTEMPTS , self.get_data, url )
+
+ def get_jenkins_obj(self):
+ """Not implemented, abstract method implemented by child classes"""
+ raise NotImplemented("Abstract method, implemented by child classes")
+
+ @classmethod
+ def python_api_url( cls, url ):
+ if url.endswith( config.JENKINS_API ):
+ return url
+ else:
+ if url.endswith( r"/" ):
+ fmt="%s%s"
+ else:
+ fmt = "%s/%s"
+ return fmt % (url, config.JENKINS_API)
+
+ def get_data( self, url ):
+ """
+ Find out how to connect, and then grab the data.
+ """
+ fn_urlopen = self.get_jenkins_obj().get_opener()
+ try:
+ stream = fn_urlopen( url )
+ result = eval( stream.read() )
+ except urllib2.HTTPError, e:
+ log.warn( "Error reading %s" % url )
+ log.exception(e)
+ raise
+ return result
--- /dev/null
+import logging
+import urlparse
+import urllib2
+from collections import defaultdict
+from datetime import time
+from jenkinsapi.build import Build
+from jenkinsapi.jenkinsbase import JenkinsBase
+
+from exceptions import NoBuildData, NotFound
+
+log = logging.getLogger(__name__)
+
+class Job(JenkinsBase):
+ """
+ Represents a jenkins job
+ A job can hold N builds which are the actual execution environments
+ """
+ def __init__( self, url, name, jenkins_obj ):
+ self.name = name
+ self.jenkins = jenkins_obj
+ self._revmap = None
+ JenkinsBase.__init__( self, url )
+
+ def id( self ):
+ return self._data["name"]
+
+ def __str__(self):
+ return self._data["name"]
+
+ def get_jenkins_obj(self):
+ return self.jenkins
+
+ def get_build_triggerurl( self, token=None ):
+ if token is None:
+ extra = "build"
+ else:
+ assert isinstance(token, str ), "token if provided should be a string."
+ extra = "build?token=%s" % token
+ buildurl = urlparse.urljoin( self.baseurl, extra )
+ return buildurl
+
+ def hit_url(self, url ):
+ fn_urlopen = self.get_jenkins_obj().get_opener()
+ try:
+ stream = fn_urlopen( url )
+ html_result = stream.read()
+ except urllib2.HTTPError, e:
+ log.debug( "Error reading %s" % url )
+ log.exception(e)
+ raise
+ return html_result
+
+ def invoke( self, securitytoken=None, block=False, skip_if_running=False, invoke_pre_check_delay=3, invoke_block_delay=15 ):
+ assert isinstance( invoke_pre_check_delay, (int, float) )
+ assert isinstance( invoke_block_delay, (int, float) )
+ assert isinstance( block, bool )
+ assert isinstance( skip_if_running, bool )
+ skip_build = False
+ if self.is_queued():
+ log.warn( "Will not request new build because %s is already queued" % self.id() )
+ skip_build = True
+ elif self.is_running():
+ if skip_if_running:
+ log.warn( "Will not request new build because %s is already running" % self.id() )
+ skip_build = True
+ else:
+ log.warn("Will re-schedule %s even though it is already running" % self.id() )
+ original_build_no = self.get_last_buildnumber()
+ if skip_build:
+ pass
+ else:
+ log.info( "Attempting to start %s on %s" % ( self.id(), repr(self.get_jenkins_obj()) ) )
+ url = self.get_build_triggerurl( securitytoken )
+ html_result = self.hit_url( url )
+ assert len( html_result ) > 0
+ if invoke_pre_check_delay > 0:
+ log.info("Waiting for %is to allow Jenkins to catch up" % invoke_pre_check_delay )
+ time.sleep( invoke_pre_check_delay )
+ if block:
+ total_wait = 0
+ while self.is_queued():
+ log.info( "Waited %is for %s to begin..." % ( total_wait, self.id() ) )
+ time.sleep( invoke_block_delay )
+ total_wait += invoke_block_delay
+ if self.is_running():
+ running_build = self.get_last_build()
+ running_build.block_until_complete( delay=invoke_pre_check_delay )
+ assert running_build.is_good()
+ else:
+ assert self.get_last_buildnumber() > original_build_no, "Job does not appear to have run."
+ else:
+ if self.is_queued():
+ log.info( "%s has been queued." % self.id() )
+ elif self.is_running():
+ log.info( "%s is running." % self.id() )
+ elif original_build_no < self.get_last_buildnumber():
+ log.info( "%s has completed." % self.id() )
+ else:
+ raise AssertionError("The job did not schedule.")
+
+ def _buildid_for_type(self, buildtype):
+ """Gets a buildid for a given type of build"""
+ KNOWNBUILDTYPES=["lastSuccessfulBuild", "lastBuild", "lastCompletedBuild"]
+ assert buildtype in KNOWNBUILDTYPES
+ buildid = self._data[buildtype]["number"]
+ assert type(buildid) == int, "Build ID should be an integer, got %s" % repr( buildid )
+ return buildid
+
+ def get_last_good_buildnumber( self ):
+ """
+ Get the numerical ID of the last good build.
+ """
+ return self._buildid_for_type(buildtype="lastSuccessfulBuild")
+
+ def get_last_buildnumber( self ):
+ """
+ Get the numerical ID of the last build.
+ """
+ return self._buildid_for_type(buildtype="lastBuild")
+
+ def get_last_completed_buildnumber( self ):
+ """
+ Get the numerical ID of the last complete build.
+ """
+ return self._buildid_for_type(buildtype="lastCompletedBuild")
+
+ def get_build_dict(self):
+ if not self._data.has_key( "builds" ):
+ raise NoBuildData( repr(self) )
+ return dict( ( a["number"], a["url"] ) for a in self._data["builds"] )
+
+ def get_revision_dict(self):
+ """
+ Get dictionary of all revisions with a list of buildnumbers (int) that used that particular revision
+ """
+ revs = defaultdict(list)
+ if 'builds' not in self._data:
+ raise NoBuildData( repr(self))
+ for buildnumber in self.get_build_ids():
+ revs[self.get_build(buildnumber).get_revision()].append(buildnumber)
+ return revs
+
+ def get_build_ids(self):
+ """
+ Return a sorted list of all good builds as ints.
+ """
+ return reversed( sorted( self.get_build_dict().keys() ) )
+
+ def get_last_good_build( self ):
+ """
+ Get the last good build
+ """
+ bn = self.get_last_good_buildnumber()
+ return self.get_build( bn )
+
+ def get_last_build( self ):
+ """
+ Get the last good build
+ """
+ bn = self.get_last_buildnumber()
+ return self.get_build( bn )
+
+ def get_last_completed_build( self ):
+ """
+ Get the last build regardless of status
+ """
+ bn = self.get_last_completed_buildnumber()
+ return self.get_build( bn )
+
+ def get_buildnumber_for_revision(self, revision, refresh=False):
+ """
+
+ :param revision: subversion revision to look for, int
+ :param refresh: boolean, whether or not to refresh the revision -> buildnumber map
+ :return: list of buildnumbers, [int]
+ """
+ if not isinstance(revision, int):
+ revision = int(revision)
+ if self._revmap is None or refresh:
+ self._revmap = self.get_revision_dict()
+ try:
+ return self._revmap[revision]
+ except KeyError:
+ raise NotFound("Couldn't find a build with that revision")
+
+ def get_build( self, buildnumber ):
+ assert type(buildnumber) == int
+ url = self.get_build_dict()[ buildnumber ]
+ return Build( url, buildnumber, job=self )
+
+ def __getitem__( self, buildnumber ):
+ return self.get_build(buildnumber)
+
+ def is_queued_or_running(self):
+ return self.is_queued() or self.is_running()
+
+ def is_queued(self):
+ self.poll()
+ return self._data["inQueue"]
+
+ def is_running(self):
+ self.poll()
+ try:
+ return self.get_last_build().is_running()
+ except NoBuildData:
+ log.info("No build info available for %s, assuming not running." % str(self) )
+ return False
--- /dev/null
+from jenkinsapi.jenkinsbase import JenkinsBase
+import logging
+
+log = logging.getLogger(__name__)
+
+class Node(JenkinsBase):
+ """
+ Class to hold information on nodes that are attached as slaves to the master jenkins instance
+ """
+
+ def __init__(self, baseurl, nodename, jenkins_obj):
+ """
+ Init a node object by providing all relevant pointers to it
+ :param baseurl: basic url for querying information on a node
+ :param nodename: hostname of the node
+ :param jenkins_obj: ref to the jenkins obj
+ :return: Node obj
+ """
+ self.name = nodename
+ self.jenkins = jenkins_obj
+ JenkinsBase.__init__(self, baseurl)
+
+ def get_jenkins_obj(self):
+ return self.jenkins
+
+ def id(self):
+ return self.name
+
+ def __str__(self):
+ return self.id()
+
+ def get_node_data(self):
+ return self._data
+
+ def is_online(self):
+ return not self._data['offline']
+
+ def is_jnlpagent(self):
+ return self._data['jnlpAgent']
+
+ def is_idle(self):
+ return self._data['idle']
+
--- /dev/null
+class Result(object):
+ def __init__(self, **kwargs ):
+ """
+
+ """
+ self.__dict__.update( kwargs )
+
+ def __str__(self):
+ return "%s %s %s" % ( self.className, self.name, self.status )
+
+ def __repr__(self):
+ module_name = self.__class__.__module__
+ class_name = self.__class__.__name__
+ self_str = str( self )
+ return "<%s.%s %s>" % ( module_name , class_name , self_str )
+
+ def id(self):
+ """
+ Calculate an ID for this object.
+ """
+ return "%s.%s" % ( self.className, self.name )
--- /dev/null
+from jenkinsapi.jenkinsbase import JenkinsBase
+from jenkinsapi.result import Result
+
+class ResultSet(JenkinsBase):
+ """
+ Represents a result from a completed Jenkins run.
+ """
+ def __init__(self, url, build ):
+ """
+ Init a resultset
+ :param url: url for a build, str
+ :param build: build obj
+ """
+ self.build = build
+ JenkinsBase.__init__(self, url)
+
+ def get_jenkins_obj(self):
+ return self.build.job.get_jenkins_obj()
+
+ def __str__(self):
+ return "Test Result for %s" % str( self.build )
+
+ def keys(self):
+ return [ a[0] for a in self.iteritems() ]
+
+ def items(self):
+ return [a for a in self.iteritems()]
+
+ def iteritems(self):
+ for suite in self._data.get("suites", [] ):
+ for case in suite["cases"]:
+ R = Result( **case )
+ yield R.id(), R
+
+ for report_set in self._data.get( "childReports", [] ):
+ for suite in report_set["result"]["suites"]:
+ for case in suite["cases"]:
+ R = Result( **case )
+ yield R.id(), R
+
+ def __len__(self):
+ return len(self.items())
--- /dev/null
+import logging
+import time
+
+log = logging.getLogger( __name__ )
+
+IGNORE_EXCEPTIONS = [ AttributeError, KeyboardInterrupt ]
+
+DEFAULT_SLEEP_TIME = 1
+
+def retry_function( tries, fn, *args, **kwargs ):
+ """
+ Retry function - calls an unreliable function n times before giving up, if tries is exceeded
+ and it still fails the most recent exception is raised.
+ """
+ assert isinstance( tries, int ), "Tries should be a non-zero positive integer"
+ assert tries > 0, "Tries should be a non-zero positive integer"
+ for attempt in range(0, tries):
+ attemptno = attempt + 1
+ if attemptno == tries:
+ log.warn( "Last chance: #%i of %i" % ( attemptno, tries ) )
+ elif tries > attempt > 0:
+ log.warn( "Attempt #%i of %i" % ( attemptno, tries ) )
+ try:
+ result = fn( *args, **kwargs )
+ if attempt > 0:
+ log.info( "Result obtained after attempt %i" % attemptno )
+ return result
+ except Exception, e:
+ if type(e) in IGNORE_EXCEPTIONS:
+ # Immediatly raise in some cases.
+ raise
+ try:
+ fn_name = fn.__name__
+ except AttributeError:
+ fn_name = "Anonymous Function"
+ log.exception(e)
+ log.warn( "%s failed at attempt %i, trying again." % ( fn_name , attemptno ) )
+ time.sleep( DEFAULT_SLEEP_TIME )
+ raise e
--- /dev/null
+import urllib2
+import base64
+
+import logging
+
+log = logging.getLogger( __name__ )
+
+class PreemptiveBasicAuthHandler(urllib2.BaseHandler):
+ """
+ A BasicAuthHandler class that will add Basic Auth headers to a request
+ even when there is no basic auth challenge from the server
+ Jenkins does not challenge basic auth but expects it to be present
+ """
+ def __init__(self, password_mgr=None):
+ if password_mgr is None:
+ password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+ self.passwd = password_mgr
+ self.add_password = self.passwd.add_password
+
+ def http_request(self,req):
+ uri = req.get_full_url()
+ user, pw = self.passwd.find_user_password(None,uri)
+ log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw))
+ if pw is None: return req
+ raw = "%s:%s" % (user, pw)
+ auth = 'Basic %s' % base64.b64encode(raw).strip()
+ req.add_unredirected_header('Authorization', auth)
+ return req
+
+def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ):
+ """
+ Creates an url opener that works with both jenkins auth and proxy auth
+ If no values are provided for the jenkins or proxy vars, a regular opener is returned
+ :param jenkinsuser: username for jenkins, str
+ :param jenkinspass: password for jenkins, str
+ :param jenkinsurl: jenkins url, str
+ :param proxyhost: proxy hostname, str
+ :param proxyport: proxy port, int
+ :param proxyuser: proxy username, str
+ :param proxypass: proxy password, str
+ :return: urllib2.opener configured for auth
+ """
+ handlers = []
+ for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl):
+ handlers.append(handler)
+ for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
+ handlers.append(handler)
+ opener = urllib2.build_opener(*handlers)
+ return opener.open
+
+def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl):
+ """
+ Get a basic authentification handler for jenkins
+ :param jenkinsuser: jenkins username, str
+ :param jenkinspass: jenkins password, str
+ :param jenkinsurl: jenkins base url, str
+ :return: a list of handlers
+ """
+ for param in jenkinsuser, jenkinspass, jenkinsurl:
+ if param is None:
+ return []
+ assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser)
+ assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass)
+# hostname = urlparse.urlsplit(jenkinsurl).hostname
+ handler = PreemptiveBasicAuthHandler()
+ handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass)
+ log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser))
+ return [ handler ]
+
+def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
+ """
+ Get a configured handler for a proxy
+
+ :param proxyhost: proxy hostname, str
+ :param proxyport: proxy port, int
+ :param proxyuser: proxy username, str
+ :param proxypass: proxy password, str
+ :return: list of handlers
+ """
+ for param in proxyhost, proxyport, proxyuser, proxypass:
+ if param is None:
+ return []
+ assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )
+ assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )
+ assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )
+
+ proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),
+ 'https': 'http://%s:%i/' % (proxyhost, proxyport) }
+
+ proxy_handler = urllib2.ProxyHandler( proxy_spec )
+ proxy_auth_handler = urllib2.HTTPBasicAuthHandler()
+ proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )
+ return [proxy_handler, proxy_auth_handler]
+
--- /dev/null
+from jenkinsapi.jenkinsbase import JenkinsBase
+from jenkinsapi.job import Job
+
+class View(JenkinsBase):
+
+ def __init__(self, url, name, jenkins_obj):
+ self.name = name
+ self.jenkins_obj = jenkins_obj
+ JenkinsBase.__init__(self, url)
+
+ def __str__(self):
+ return self.name
+
+ def __getitem__(self, str_job_id ):
+ assert isinstance( str_job_id, str )
+ api_url = self.python_api_url( self.get_job_url( str_job_id ) )
+ return Job( api_url, str_job_id, self.jenkins_obj )
+
+ def keys(self):
+ return self.get_job_dict().keys()
+
+ def iteritems(self):
+ for name, url in self.get_job_dict().iteritems():
+ api_url = self.python_api_url( url )
+ yield name, Job( api_url, name, self.jenkins_obj )
+
+ def values(self):
+ return [ a[1] for a in self.iteritems() ]
+
+ def items(self):
+ return [ a for a in self.iteritems() ]
+
+ def _get_jobs( self ):
+ if not self._data.has_key( "jobs" ):
+ pass
+ else:
+ for viewdict in self._data["jobs"]:
+ yield viewdict["name"], viewdict["url"]
+
+ def get_job_dict(self):
+ return dict( self._get_jobs() )
+
+ def __len__(self):
+ return len( self.get_job_dict().keys() )
+
+ def get_job_url( self, str_job_name ):
+ try:
+ job_dict = self.get_job_dict()
+ return job_dict[ str_job_name ]
+ except KeyError:
+ #noinspection PyUnboundLocalVariable
+ all_views = ", ".join( job_dict.keys() )
+ raise KeyError("Job %s is not known - available: %s" % ( str_job_name, all_views ) )
+
+ def get_jenkins_obj(self):
+ return self.jenkins_obj
+
+ def id(self):
+ """
+ Calculate an ID for this object.
+ """
+ return "%s.%s" % ( self.className, self.name )
\ No newline at end of file
+++ /dev/null
-from pyjenkinsci.artifact import Artifact\r
-from pyjenkinsci import constants\r
-from pyjenkinsci.jenkins import Jenkins\r
-from pyjenkinsci.exceptions import ArtifactsMissing, TimeOut, BadURL\r
-from urllib2 import urlparse\r
-\r
-import os\r
-import time\r
-import logging\r
-\r
-log = logging.getLogger(__name__)\r
-\r
-def get_latest_test_results( jenkinsurl, jobname ):\r
- """\r
- A convenience function to fetch down the very latest test results from a jenkins job.\r
- """\r
- latestbuild = get_latest_build( jenkinsurl, jobname )\r
- res = latestbuild.get_resultset()\r
- return res\r
-\r
-def get_latest_build(jenkinsurl, jobname):\r
- """\r
- A convenience function to fetch down the very latest test results from a jenkins job.\r
- """\r
- jenkinsci = Jenkins(jenkinsurl)\r
- job = jenkinsci[jobname]\r
- return job.get_last_build()\r
-\r
-def get_latest_complete_build(jenkinsurl, jobname):\r
- """\r
- A convenience function to fetch down the very latest test results from a jenkins job.\r
- """\r
- jenkinsci = Jenkins(jenkinsurl)\r
- job = jenkinsci[jobname]\r
- return job.get_last_completed_build()\r
-\r
-def get_artifacts( jenkinsurl, jobid=None, build_no=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None ):\r
- """\r
- Find all the artifacts for the latest build of a job.\r
- """\r
- jenkinsci = Jenkins(jenkinsurl, proxyhost, proxyport, proxyuser, proxypass)\r
- job = jenkinsci[jobid]\r
- if build_no:\r
- build = job.get_build( build_no )\r
- else:\r
- build = job.get_last_good_build()\r
- artifacts = dict((artifact.filename, artifact) for artifact in build.get_artifacts())\r
- log.info("Found %i artifacts in '%s'" % ( len(artifacts.keys() ), build_no ))\r
- return artifacts\r
-\r
-def search_artifacts(jenkinsurl, jobid, artifact_ids=None, same_build=True, build_search_limit=None):\r
- """\r
- Search the entire history of a jenkins job for a list of artifact names. If same_build\r
- is true then ensure that all artifacts come from the same build of the job\r
- """\r
- if len(artifact_ids) == 0 or artifact_ids is None:\r
- return []\r
- assert same_build, "same_build==False not supported yet"\r
- jenkinsci = Jenkins( jenkinsurl )\r
- job = jenkinsci[ jobid ]\r
- build_ids = job.get_build_ids()\r
- for build_id in build_ids:\r
- build = job.get_build( build_id )\r
- artifacts = build.get_artifact_dict()\r
- if set( artifact_ids ).issubset( set( artifacts.keys() ) ):\r
- return dict( ( a,artifacts[a] ) for a in artifact_ids )\r
- missing_artifacts = set( artifact_ids ) - set( artifacts.keys() )\r
- log.debug("Artifacts %s missing from %s #%i" % ( ", ".join( missing_artifacts ), jobid, build_id ))\r
- raise ArtifactsMissing( missing_artifacts )\r
-\r
-def grab_artifact(jenkinsurl, jobid, artifactid, targetdir):\r
- """\r
- Convenience method to find the latest good version of an artifact and save it\r
- to a target directory. Directory is made automatically if not exists.\r
- """\r
- artifacts = get_artifacts( jenkinsurl, jobid )\r
- artifact = artifacts[ artifactid ]\r
- if not os.path.exists( targetdir ):\r
- os.makedirs( targetdir )\r
- artifact.savetodir( targetdir)\r
-\r
-def block_until_complete(jenkinsurl, jobs, maxwait=12000, interval=30, raise_on_timeout=True):\r
- """\r
- Wait until all of the jobs in the list are complete.\r
- """\r
- assert maxwait > 0\r
- assert maxwait > interval\r
- assert interval > 0\r
-\r
- obj_jenkins = Jenkins(jenkinsurl)\r
- obj_jobs = [obj_jenkins[jid] for jid in jobs]\r
- for time_left in xrange(maxwait, 0, -interval):\r
- still_running = [j for j in obj_jobs if j.is_queued_or_running()]\r
- if not still_running:\r
- return\r
- str_still_running = ", ".join('"%s"' % str(a) for a in still_running)\r
- log.warn( "Waiting for jobs %s to complete. Will wait another %is" % (str_still_running, time_left ))\r
- time.sleep(interval)\r
- if raise_on_timeout:\r
- raise TimeOut("Waited too long for these jobs to complete: %s" % str_still_running)\r
-\r
-def get_view_from_url(url):\r
- """\r
- Factory method\r
- """\r
- matched = constants.RE_SPLIT_VIEW_URL.search(url)\r
- if not matched:\r
- raise BadURL("Cannot parse URL %s" % url)\r
- jenkinsurl, view_name = matched.groups()\r
- jenkinsci = Jenkins(jenkinsurl)\r
- return jenkinsci.get_view(view_name)\r
-\r
-def install_artifacts(artifacts, dirstruct, installdir, basestaticurl):\r
- """\r
- Install the artifacts.\r
- """\r
- assert basestaticurl.endswith("/"), "Basestaticurl should end with /"\r
- installed = []\r
- for reldir, artifactnames in dirstruct.items():\r
- destdir = os.path.join(installdir, reldir)\r
- if not os.path.exists(destdir):\r
- log.warn("Making install directory %s" % destdir)\r
- os.makedirs(destdir)\r
- else:\r
- assert os.path.isdir(destdir)\r
- for artifactname in artifactnames:\r
- destpath = os.path.abspath(os.path.join( destdir, artifactname))\r
- if artifactname in artifacts.keys():\r
- # The artifact must be loaded from jenkins\r
- theartifact = artifacts[artifactname]\r
- else:\r
- # It's probably a static file, we can get it from the static collection\r
- staticurl = urlparse.urljoin(basestaticurl, artifactname)\r
- theartifact = Artifact(artifactname, staticurl)\r
- theartifact.save(destpath)\r
- installed.append(destpath)\r
- return installed\r
+++ /dev/null
-from __future__ import with_statement
-import urllib
-import os
-import logging
-import hashlib
-
-from pyjenkinsci.exceptions import ArtifactBroken
-from pyjenkinsci.fingerprint import Fingerprint
-
-log = logging.getLogger( __name__ )
-
-class Artifact(object):
-
- def __init__( self, filename, url, build=None ):
- self.filename = filename
- self.url = url
- self.build = build
-
- def save( self, fspath ):
- """
- Save the artifact to an explicit path. The containing directory must exist.
- Returns a reference to the file which has just been writen to.
-
- :param fspath: full pathname including the filename, str
- :return: filepath
- """
- log.info( "Saving artifact @ %s to %s" % (self.url, fspath) )
- if not fspath.endswith( self.filename ):
- log.warn( "Attempt to change the filename of artifact %s on save." % self.filename )
- if os.path.exists(fspath):
- if self.build:
- try:
- if self._verify_download(fspath):
- log.info( "Local copy of %s is already up to date." % self.filename)
- return fspath
- except ArtifactBroken:
- log.info("Jenkins artifact could not be identified.")
- else:
- log.info("This file did not originate from Jenkins, so cannot check.")
- else:
- log.info("Local file is missing, downloading new.")
- filename = self._do_download(fspath)
- try:
- self._verify_download(filename)
- except ArtifactBroken:
- log.warning("fingerprint of the downloaded artifact could not be verified")
- return filename
-
- def _do_download(self, fspath):
- filename, headers = urllib.urlretrieve(self.url, filename=fspath)
- return filename
-
- def _verify_download(self, fspath):
- local_md5 = self._md5sum(fspath)
- fp = Fingerprint(self.build.job.jenkins.baseurl, local_md5, self.build.job.jenkins)
- return fp.validate_for_build(fspath, self.build.job, self.build)
-
- def _md5sum(self, fspath, chunksize=2**20):
- md5 = hashlib.md5()
- try:
- with open(fspath,'rb') as f:
- for chunk in iter(lambda: f.read(chunksize), ''):
- md5.update(chunk)
- except:
- raise
- return md5.hexdigest()
-
- def savetodir( self, dirpath ):
- """
- Save the artifact to a folder. The containing directory must be exist, but use the artifact's
- default filename.
- """
- assert os.path.exists( dirpath )
- assert os.path.isdir( dirpath )
- outputfilepath = os.path.join( dirpath, self.filename )
- self.save( outputfilepath )
-
-
- def __repr__( self ):
- return """<%s.%s %s>""" % ( self.__class__.__module__,
- self.__class__.__name__,
- self.url )
+++ /dev/null
-from pyjenkinsci.artifact import Artifact\r
-from pyjenkinsci import config\r
-from pyjenkinsci.jenkinsbase import JenkinsBase\r
-from pyjenkinsci.exceptions import NoResults, FailedNoResults\r
-from pyjenkinsci.constants import STATUS_FAIL, STATUS_ABORTED, RESULTSTATUS_FAILURE\r
-from pyjenkinsci.result_set import ResultSet\r
-\r
-from datetime import time\r
-import logging\r
-\r
-log = logging.getLogger(__name__)\r
-\r
-class Build(JenkinsBase):\r
- """\r
- Represents a jenkins build, executed in context of a job.\r
- """\r
-\r
- STR_TOTALCOUNT = "totalCount"\r
- STR_TPL_NOTESTS_ERR = "%s has status %s, and does not have any test results"\r
-\r
- def __init__( self, url, buildno, job ):\r
- assert type(buildno) == int\r
- self.buildno = buildno\r
- self.job = job\r
- JenkinsBase.__init__( self, url )\r
-\r
- def __str__(self):\r
- return self._data['fullDisplayName']\r
-\r
- def id(self):\r
- return self._data["number"]\r
-\r
- def get_status(self):\r
- return self._data["result"]\r
-\r
- def get_revision(self):\r
- for set in self._data["changeSet"]["revisions"]:\r
- return set["revision"]\r
-\r
- def get_duration(self):\r
- return self._data["duration"]\r
-\r
- def get_artifacts( self ):\r
- for afinfo in self._data["artifacts"]:\r
- url = "%sartifact/%s" % ( self.baseurl, afinfo["relativePath"] )\r
- af = Artifact( afinfo["fileName"], url, self )\r
- yield af\r
- del af, url\r
-\r
- def get_artifact_dict(self):\r
- return dict( (a.filename, a) for a in self.get_artifacts() )\r
-\r
- def is_running( self ):\r
- """\r
- Return a bool if running.\r
- """\r
- self.poll()\r
- return self._data["building"]\r
-\r
- def is_good( self ):\r
- """\r
- Return a bool, true if the build was good.\r
- If the build is still running, return False.\r
- """\r
- return ( not self.is_running() ) and self._data["result"] == 'SUCCESS'\r
-\r
- def block_until_complete(self, delay=15):\r
- assert isinstance( delay, int )\r
- count = 0\r
- while self.is_running():\r
- total_wait = delay * count\r
- log.info("Waited %is for %s #%s to complete" % ( total_wait, self.job.id(), self.id() ) )\r
- time.sleep( delay )\r
- count += 1\r
-\r
- def get_jenkins_obj(self):\r
- return self.job.get_jenkins_obj()\r
-\r
- def get_result_url(self):\r
- """\r
- Return the URL for the object which provides the job's result summary.\r
- """\r
- url_tpl = r"%stestReport/%s"\r
- return url_tpl % ( self._data["url"] , config.JENKINS_API )\r
-\r
- def get_resultset(self):\r
- """\r
- Obtain detailed results for this build.\r
- """\r
- result_url = self.get_result_url()\r
- if self.STR_TOTALCOUNT not in self.get_actions():\r
- raise NoResults( "%s does not have any published results" % str(self) )\r
- buildstatus = self.get_status()\r
- if buildstatus in [ STATUS_FAIL, RESULTSTATUS_FAILURE, STATUS_ABORTED ]:\r
- raise FailedNoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) )\r
- if self.get_actions()[ self.STR_TOTALCOUNT ] == 0:\r
- raise NoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) )\r
- obj_results = ResultSet( result_url, build=self )\r
- return obj_results\r
-\r
- def has_resultset(self):\r
- """\r
- Return a boolean, true if a result set is available. false if not.\r
- """\r
- return self.STR_TOTALCOUNT in self.get_actions()\r
-\r
- def get_actions(self):\r
- all_actions = {}\r
- for dct_action in self._data["actions"]:\r
- all_actions.update( dct_action )\r
- return all_actions\r
-\r
+++ /dev/null
-import os
-import sys
-import logging
-import optparse
-from pyjenkinsci import jenkins
-
-log = logging.getLogger(__name__)
-
-class jenkins_invoke(object):
- @classmethod
- def mkparser(cls):
- parser = optparse.OptionParser()
- DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )
- parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."
- parser.add_option("-J", "--jenkinsbase", dest="baseurl",
- help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,
- type="str", default=DEFAULT_BASEURL)
- parser.add_option('--username', '-u', dest='username',
- help="Username for jenkins authentification", type='str', default=None)
- parser.add_option('--password', '-p', dest='password',
- help="password for jenkins user auth", type='str', default=None)
- parser.add_option("-b", "--block", dest="block", action="store_true", default=False,
- help="Block until each of the jobs is complete.")
- parser.add_option("-t", "--token", dest="token",help="Optional security token.",
- default=None)
- return parser
-
- @classmethod
- def main(cls):
- parser = cls.mkparser()
- options, args = parser.parse_args()
- try:
- assert len(args) > 0, "Need to specify at least one job name"
- except AssertionError, e:
- log.critical(e[0])
- parser.print_help()
- sys.exit(1)
- invoker = cls(options, args)
- invoker()
-
- def __init__(self, options, jobs):
- self.options = options
- self.jobs = jobs
- self.api = self._get_api(baseurl=options.baseurl, username=options.username, password=options.password)
-
- def _get_api(self, baseurl, username, password):
- return jenkins.Jenkins(baseurl, username, password)
-
- def __call__(self):
- for job in self.jobs:
- self.invokejob(job, block=self.options.block, token=self.options.token)
-
- def invokejob(self, jobname, block, token):
- assert type(block) == bool
- assert type(jobname) == str
- assert token is None or isinstance(token, str)
- job = self.api.get_job(jobname)
- job.invoke(securitytoken=token, block=block)
-
-
-def main( ):
- logging.basicConfig()
- logging.getLogger("").setLevel(logging.INFO)
- jenkins_invoke.main()
\ No newline at end of file
+++ /dev/null
-JENKINS_API = r"api/python/"\r
-LOAD_TIMEOUT = 30\r
-LOAD_ATTEMPTS = 5
\ No newline at end of file
+++ /dev/null
-import re\r
-\r
-STATUS_FAIL = "FAIL"\r
-STATUS_ERROR = "ERROR"\r
-STATUS_ABORTED = "ABORTED"\r
-STATUS_REGRESSION = "REGRESSION"\r
-\r
-STATUS_FIXED = "FIXED"\r
-STATUS_PASSED = "PASSED"\r
-\r
-RESULTSTATUS_FAILURE = "FAILURE"\r
-RESULTSTATUS_FAILED = "FAILED"\r
-\r
-STR_RE_SPLIT_VIEW = "(.*)/view/([^/]*)/?"\r
-RE_SPLIT_VIEW_URL = re.compile( STR_RE_SPLIT_VIEW )\r
+++ /dev/null
-class ArtifactsMissing(Exception):
- """
- Cannot find a build with all of the required artifacts.
- """
-
-class UnknownJob( KeyError ):
- """
- Jenkins does not recognize the job requested.
- """
-
-class ArtifactBroken(Exception):
- """
- An artifact is broken, wrong
- """
-
-class TimeOut( Exception ):
- """
- Some jobs have taken too long to complete.
- """
-
-class WillNotBuild(Exception):
- """
- Cannot trigger a new build.
- """
-
-class NoBuildData(Exception):
- """
- A job has no build data.
- """
-
-class NoResults(Exception):
- """
- A build did not publish any results.
- """
-
-class FailedNoResults(NoResults):
- """
- A build did not publish any results because it failed
- """
-
-class BadURL(ValueError):
- """
- A URL appears to be broken
- """
-
-class NotFound(Exception):
- """
- Resource cannot be found
- """
+++ /dev/null
-from pyjenkinsci.jenkinsbase import JenkinsBase\r
-from pyjenkinsci.exceptions import ArtifactBroken\r
-\r
-import urllib2\r
-import re\r
-\r
-import logging\r
-\r
-log = logging.getLogger( __name__ )\r
-\r
-class Fingerprint(JenkinsBase):\r
- """\r
- Represents a jenkins fingerprint on a single artifact file ??\r
- """\r
- RE_MD5 = re.compile("^([0-9a-z]{32})$")\r
-\r
- def __init__(self, baseurl, id, jenkins_obj):\r
- logging.basicConfig()\r
- self.jenkins_obj = jenkins_obj\r
- assert self.RE_MD5.search( id ), "%s does not look like a valid id" % id\r
- url = "%s/fingerprint/%s/" % ( baseurl, id )\r
- JenkinsBase.__init__( self, url, poll=False )\r
- self.id = id\r
-\r
- def get_jenkins_obj(self):\r
- return self.jenkins_obj\r
-\r
- def __str__(self):\r
- return self.id\r
-\r
- def valid(self):\r
- """\r
- Return True / False if valid\r
- """\r
- try:\r
- self.poll()\r
- except urllib2.HTTPError, e:\r
- return False\r
- return True\r
-\r
- def validate_for_build(self, filename, job, build):\r
- if not self.valid():\r
- log.info("Unknown to jenkins.")\r
- return False\r
- if not self._data["original"] is None:\r
- if self._data["original"]["name"] == job:\r
- if self._data["original"]["number"] == build:\r
- return True\r
- if self._data["fileName"] != filename:\r
- log.info("Filename from jenkins (%s) did not match provided (%s)" % ( self._data["fileName"], filename ) )\r
- return False\r
- for usage_item in self._data["usage"]:\r
- if usage_item["name"] == job:\r
- for range in usage_item["ranges"]["ranges"]:\r
- if range["start"] <= build <= range["end"]:\r
- log.info("This artifact was generated by %s between build %i and %i" % ( job, range["start"], range["end"] ) )\r
- return True\r
- return False\r
-\r
- def validate(self):\r
- try:\r
- assert self.valid()\r
- except AssertionError, ae:\r
- raise ArtifactBroken( "Artifact %s seems to be broken, check %s" % ( self.id, self.baseurl ) )\r
- except urllib2.HTTPError, httpe:\r
- raise ArtifactBroken( "Unable to validate artifact id %s using %s" % ( self.id, self.baseurl ) )\r
- return True\r
-\r
- def get_info( self ):\r
- """\r
- Returns a tuple of build-name, build# and artifiact filename for a good build.\r
- """\r
- self.poll()\r
- return self._data["original"]["name"], self._data["original"]["number"], self._data["fileName"]\r
+++ /dev/null
-from pyjenkinsci.jenkinsbase import JenkinsBase
-from pyjenkinsci.fingerprint import Fingerprint
-from pyjenkinsci.job import Job
-from pyjenkinsci.view import View
-from pyjenkinsci.node import Node
-from pyjenkinsci.exceptions import UnknownJob
-from utils.urlopener import mkurlopener
-import logging
-import time
-import urllib2
-import urllib
-try:
- import json
-except ImportError:
- import simplejson as json
-
-log = logging.getLogger(__name__)
-
-class Jenkins(JenkinsBase):
- """
- Represents a jenkins environment.
- """
- def __init__(self, baseurl, username=None, password=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None):
- """
-
- :param baseurl: baseurl for jenkins instance including port, str
- :param username: username for jenkins auth, str
- :param password: password for jenkins auth, str
- :param proxyhost: proxyhostname, str
- :param proxyport: proxyport, int
- :param proxyuser: proxyusername for proxy auth, str
- :param proxypass: proxypassword for proxyauth, str
- :return: a Jenkins obj
- """
- self.username = username
- self.password = password
- self.proxyhost = proxyhost
- self.proxyport = proxyport
- self.proxyuser = proxyuser
- self.proxypass = proxypass
- JenkinsBase.__init__( self, baseurl )
-
- def get_proxy_auth(self):
- return self.proxyhost, self.proxyport, self.proxyuser, self.proxypass
-
- def get_jenkins_auth(self):
- return self.username, self.password, self.baseurl
-
- def get_auth(self):
- auth_args = []
- auth_args.extend(self.get_jenkins_auth())
- auth_args.extend(self.get_proxy_auth())
- log.debug("args: %s" % auth_args)
- return auth_args
-
- def get_opener( self ):
- return mkurlopener(*self.get_auth())
-
- def validate_fingerprint( self, id ):
- obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
- obj_fingerprint.validate()
- log.info("Jenkins says %s is valid" % id)
-
- def get_artifact_data(self, id):
- obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
- obj_fingerprint.validate()
- return obj_fingerprint.get_info()
-
- def validate_fingerprint_for_build(self, digest, filename, job, build ):
- obj_fingerprint = Fingerprint( self.baseurl, digest, jenkins_obj=self )
- return obj_fingerprint.validate_for_build( filename, job, build )
-
- def get_jenkins_obj(self):
- return self
-
- def get_jobs(self):
- """
- Fetch all the build-names on this Jenkins server.
- """
- for info in self._data["jobs"]:
- yield info["name"], Job( info["url"], info["name"], jenkins_obj=self)
-
- def get_job(self, jobname):
- """
- Get a job by name
- :param jobname: name of the job, str
- :return: Job obj
- """
- return self[jobname]
-
- def iteritems(self):
- return self.get_jobs()
-
- def iterkeys(self):
- for info in self._data["jobs"]:
- yield info["name"]
-
- def keys(self):
- return [ a for a in self.iterkeys() ]
-
- def __str__(self):
- return "Jenkins server at %s" % self.baseurl
-
- def _get_views( self ):
- if not self._data.has_key( "views" ):
- pass
- else:
- for viewdict in self._data["views"]:
- yield viewdict["name"], viewdict["url"]
-
- def get_view_dict(self):
- return dict( self._get_views() )
-
- def get_view_url( self, str_view_name ):
- try:
- view_dict = self.get_view_dict()
- return view_dict[ str_view_name ]
- except KeyError, ke:
- all_views = ", ".join( view_dict.keys() )
- raise KeyError("View %s is not known - available: %s" % ( str_view_name, all_views ) )
-
- def get_view(self, str_view_name ):
- view_url = self.get_view_url( str_view_name )
- view_api_url = self.python_api_url( view_url )
- return View(view_api_url , str_view_name, jenkins_obj=self)
-
- def __getitem__(self, jobname):
- """
- Get a job by name
- :param jobname: name of job, str
- :return: Job obj
- """
- for name, job in self.get_jobs():
- if name == jobname:
- return job
- raise UnknownJob(jobname)
-
- def get_node_dict(self):
- """Get registered slave nodes on this instance"""
- url = self.python_api_url(self.get_node_url())
- node_dict = dict(self.get_data(url))
- return dict(
- (node['displayName'], self.python_api_url(self.get_node_url(node['displayName'])))
- for node in node_dict['computer'])
-
- def get_node(self, nodename):
- """Get a node object for a specific node"""
- node_url = self.python_api_url(self.get_node_url(nodename))
- return Node(node_url, nodename, jenkins_obj=self)
-
- def get_node_url(self, nodename=""):
- """Return the url for nodes"""
- url = "%(baseurl)s/computer/%(nodename)s" % {'baseurl': self.baseurl, 'nodename': nodename}
- return url
-
- def has_node(self, nodename):
- """
- Does a node by the name specified exist
- :param nodename: string, hostname
- :return: boolean
- """
- return nodename in self.get_node_dict()
-
- def delete_node(self, nodename):
- """
- Remove a node from the managed slave list
- Please note that you cannot remove the master node
-
- :param nodename: string holding a hostname
- :return: None
- """
- assert self.has_node(nodename), "This node: %s is not registered as a slave" % nodename
- assert nodename != "master", "you cannot delete the master node"
- url = "%s/doDelete" % self.get_node_url(nodename)
- fn_urlopen = self.get_jenkins_obj().get_opener()
- try:
- stream = fn_urlopen(url)
- html_result = stream.read()
- except urllib2.HTTPError, e:
- log.debug("Error reading %s" % url)
- raise
- return not self.has_node(nodename)
-
- def create_node(self, name, num_executors=2, node_description=None,
- remote_fs='/var/lib/jenkins', labels=None, exclusive=False):
- """
- Create a new slave node by name.
-
- :param name: fqdn of slave, str
- :param num_executors: number of executors, int
- :param node_description: a freetext field describing the node
- :param remote_fs: jenkins path, str
- :param labels: labels to associate with slave, str
- :param exclusive: tied to specific job, boolean
- :return: node obj
- """
- NODE_TYPE = 'jenkins.slaves.DumbSlave$DescriptorImpl'
- MODE = 'NORMAL'
- if self.has_node(name):
- return Node(nodename=name, baseurl=self.get_node_url(nodename=name), jenkins_obj=self)
- if exclusive:
- MODE = 'EXCLUSIVE'
- params = {
- 'name' : name,
- 'type' : NODE_TYPE,
- 'json' : json.dumps ({
- 'name' : name,
- 'nodeDescription' : node_description,
- 'numExecutors' : num_executors,
- 'remoteFS' : remote_fs,
- 'labelString' : labels,
- 'mode' : MODE,
- 'type' : NODE_TYPE,
- 'retentionStrategy' : { 'stapler-class' : 'jenkins.slaves.RetentionStrategy$Always' },
- 'nodeProperties' : { 'stapler-class-bag' : 'true' },
- 'launcher' : { 'stapler-class' : 'jenkins.slaves.JNLPLauncher' }
- })
- }
- url = "%(nodeurl)s/doCreateItem?%(params)s" % {
- 'nodeurl': self.get_node_url(),
- 'params': urllib.urlencode(params)
- }
- print url
- fn_urlopen = self.get_jenkins_obj().get_opener()
- try:
- stream = fn_urlopen(url)
- html_result = stream.read()
- except urllib2.HTTPError, e:
- log.debug("Error reading %s" % url)
- log.exception(e)
- raise
- return Node(nodename=name, baseurl=self.get_node_url(nodename=name), jenkins_obj=self)
+++ /dev/null
-import urllib2\r
-import logging\r
-import pprint\r
-from pyjenkinsci import config\r
-from pyjenkinsci.utils.retry import retry_function\r
-\r
-log = logging.getLogger( __name__ )\r
-\r
-class JenkinsBase(object):\r
- """\r
- This appears to be the base object that all other jenkins objects are inherited from\r
- """\r
- RETRY_ATTEMPTS = 5\r
-\r
- def __repr__( self ):\r
- return """<%s.%s %s>""" % ( self.__class__.__module__,\r
- self.__class__.__name__,\r
- str( self ) )\r
-\r
- def print_data(self):\r
- pprint.pprint( self._data )\r
-\r
- def __str__(self):\r
- raise NotImplemented\r
-\r
- def __init__( self, baseurl, poll=True ):\r
- """\r
- Initialize a jenkins connection\r
- """\r
- self.baseurl = baseurl\r
- if poll:\r
- try:\r
- self.poll()\r
- except urllib2.HTTPError, hte:\r
- log.exception(hte)\r
- log.warn( "Failed to conenct to %s" % baseurl )\r
- raise\r
-\r
- def poll(self):\r
- self._data = self._poll()\r
-\r
- def _poll(self):\r
- url = self.python_api_url( self.baseurl )\r
- return retry_function( self.RETRY_ATTEMPTS , self.get_data, url )\r
-\r
- def get_jenkins_obj(self):\r
- """Not implemented, abstract method implemented by child classes"""\r
- raise NotImplemented("Abstract method, implemented by child classes")\r
-\r
- @classmethod\r
- def python_api_url( cls, url ):\r
- if url.endswith( config.JENKINS_API ):\r
- return url\r
- else:\r
- if url.endswith( r"/" ):\r
- fmt="%s%s"\r
- else:\r
- fmt = "%s/%s"\r
- return fmt % (url, config.JENKINS_API)\r
-\r
- def get_data( self, url ):\r
- """\r
- Find out how to connect, and then grab the data.\r
- """\r
- fn_urlopen = self.get_jenkins_obj().get_opener()\r
- try:\r
- stream = fn_urlopen( url )\r
- result = eval( stream.read() )\r
- except urllib2.HTTPError, e:\r
- log.warn( "Error reading %s" % url )\r
- raise\r
- return result\r
+++ /dev/null
-import logging
-import urlparse
-import urllib2
-from collections import defaultdict
-from datetime import time
-from pyjenkinsci.build import Build
-from pyjenkinsci.jenkinsbase import JenkinsBase
-
-from exceptions import NoBuildData, NotFound
-
-log = logging.getLogger(__name__)
-
-class Job(JenkinsBase):
- """
- Represents a jenkins job
- A job can hold N builds which are the actual execution environments
- """
- def __init__( self, url, name, jenkins_obj ):
- self.name = name
- self.jenkins = jenkins_obj
- self._revmap = None
- JenkinsBase.__init__( self, url )
-
- def id( self ):
- return self._data["name"]
-
- def __str__(self):
- return self._data["name"]
-
- def get_jenkins_obj(self):
- return self.jenkins
-
- def get_build_triggerurl( self, token=None ):
- if token is None:
- extra = "build"
- else:
- assert isinstance(token, str ), "token if provided should be a string."
- extra = "build?token=%s" % token
- buildurl = urlparse.urljoin( self.baseurl, extra )
- return buildurl
-
- def hit_url(self, url ):
- fn_urlopen = self.get_jenkins_obj().get_opener()
- try:
- stream = fn_urlopen( url )
- html_result = stream.read()
- except urllib2.HTTPError, e:
- log.debug( "Error reading %s" % url )
- raise
- return html_result
-
- def invoke( self, securitytoken=None, block=False, skip_if_running=False, invoke_pre_check_delay=3, invoke_block_delay=15 ):
- assert isinstance( invoke_pre_check_delay, (int, float) )
- assert isinstance( invoke_block_delay, (int, float) )
- assert isinstance( block, bool )
- assert isinstance( skip_if_running, bool )
- skip_build = False
- if self.is_queued():
- log.warn( "Will not request new build because %s is already queued" % self.id() )
- skip_build = True
- elif self.is_running():
- if skip_if_running:
- log.warn( "Will not request new build because %s is already running" % self.id() )
- skip_build = True
- else:
- log.warn("Will re-schedule %s even though it is already running" % self.id() )
- original_build_no = self.get_last_buildnumber()
- if skip_build:
- pass
- else:
- log.info( "Attempting to start %s on %s" % ( self.id(), repr(self.get_jenkins_obj()) ) )
- url = self.get_build_triggerurl( securitytoken )
- html_result = self.hit_url( url )
- assert len( html_result ) > 0
- if invoke_pre_check_delay > 0:
- log.info("Waiting for %is to allow Jenkins to catch up" % invoke_pre_check_delay )
- time.sleep( invoke_pre_check_delay )
- if block:
- total_wait = 0
- while self.is_queued():
- log.info( "Waited %is for %s to begin..." % ( total_wait, self.id() ) )
- time.sleep( invoke_block_delay )
- total_wait += invoke_block_delay
- if self.is_running():
- running_build = self.get_last_build()
- running_build.block_until_complete( delay=invoke_pre_check_delay )
- assert running_build.is_good()
- else:
- assert self.get_last_buildnumber() > original_build_no, "Job does not appear to have run."
- else:
- if self.is_queued():
- log.info( "%s has been queued." % self.id() )
- elif self.is_running():
- log.info( "%s is running." % self.id() )
- elif original_build_no < self.get_last_buildnumber():
- log.info( "%s has completed." % self.id() )
- else:
- raise AssertionError("The job did not schedule.")
-
- def _buildid_for_type(self, buildtype):
- """Gets a buildid for a given type of build"""
- KNOWNBUILDTYPES=["lastSuccessfulBuild", "lastBuild", "lastCompletedBuild"]
- assert buildtype in KNOWNBUILDTYPES
- buildid = self._data[buildtype]["number"]
- assert type(buildid) == int, "Build ID should be an integer, got %s" % repr( buildid )
- return buildid
-
- def get_last_good_buildnumber( self ):
- """
- Get the numerical ID of the last good build.
- """
- return self._buildid_for_type(buildtype="lastSuccessfulBuild")
-
- def get_last_buildnumber( self ):
- """
- Get the numerical ID of the last build.
- """
- return self._buildid_for_type(buildtype="lastBuild")
-
- def get_last_completed_buildnumber( self ):
- """
- Get the numerical ID of the last complete build.
- """
- return self._buildid_for_type(buildtype="lastCompletedBuild")
-
- def get_build_dict(self):
- if not self._data.has_key( "builds" ):
- raise NoBuildData( repr(self) )
- return dict( ( a["number"], a["url"] ) for a in self._data["builds"] )
-
- def get_revision_dict(self):
- """
- Get dictionary of all revisions with a list of buildnumbers (int) that used that particular revision
- """
- revs = defaultdict(list)
- if 'builds' not in self._data:
- raise NoBuildData( repr(self))
- for buildnumber in self.get_build_ids():
- revs[self.get_build(buildnumber).get_revision()].append(buildnumber)
- return revs
-
- def get_build_ids(self):
- """
- Return a sorted list of all good builds as ints.
- """
- return reversed( sorted( self.get_build_dict().keys() ) )
-
- def get_last_good_build( self ):
- """
- Get the last good build
- """
- bn = self.get_last_good_buildnumber()
- return self.get_build( bn )
-
- def get_last_build( self ):
- """
- Get the last good build
- """
- bn = self.get_last_buildnumber()
- return self.get_build( bn )
-
- def get_last_completed_build( self ):
- """
- Get the last build regardless of status
- """
- bn = self.get_last_completed_buildnumber()
- return self.get_build( bn )
-
- def get_buildnumber_for_revision(self, revision, refresh=False):
- """
-
- :param revision: subversion revision to look for, int
- :param refresh: boolean, whether or not to refresh the revision -> buildnumber map
- :return: list of buildnumbers, [int]
- """
- if not isinstance(revision, int):
- revision = int(revision)
- if self._revmap is None or refresh:
- self._revmap = self.get_revision_dict()
- try:
- return self._revmap[revision]
- except KeyError:
- raise NotFound("Couldn't find a build with that revision")
-
- def get_build( self, buildnumber ):
- assert type(buildnumber) == int
- url = self.get_build_dict()[ buildnumber ]
- return Build( url, buildnumber, job=self )
-
- def __getitem__( self, buildnumber ):
- return self.get_build(buildnumber)
-
- def is_queued_or_running(self):
- return self.is_queued() or self.is_running()
-
- def is_queued(self):
- self.poll()
- return self._data["inQueue"]
-
- def is_running(self):
- self.poll()
- try:
- return self.get_last_build().is_running()
- except NoBuildData:
- log.info("No build info available for %s, assuming not running." % str(self) )
- return False
+++ /dev/null
-from pyjenkinsci.jenkinsbase import JenkinsBase
-import logging
-
-log = logging.getLogger(__name__)
-
-class Node(JenkinsBase):
- """
- Class to hold information on nodes that are attached as slaves to the master jenkins instance
- """
-
- def __init__(self, baseurl, nodename, jenkins_obj):
- """
- Init a node object by providing all relevant pointers to it
- :param baseurl: basic url for querying information on a node
- :param nodename: hostname of the node
- :param jenkins_obj: ref to the jenkins obj
- :return: Node obj
- """
- self.name = nodename
- self.jenkins = jenkins_obj
- JenkinsBase.__init__(self, baseurl)
-
- def get_jenkins_obj(self):
- return self.jenkins
-
- def id(self):
- return self.name
-
- def __str__(self):
- return self.id()
-
- def get_node_data(self):
- return self._data
-
- def is_online(self):
- return not self._data['offline']
-
- def is_jnlpagent(self):
- return self._data['jnlpAgent']
-
- def is_idle(self):
- return self._data['idle']
-
+++ /dev/null
-class Result(object):\r
- def __init__(self, **kwargs ):\r
- """\r
-\r
- """\r
- self.__dict__.update( kwargs )\r
-\r
- def __str__(self):\r
- return "%s %s %s" % ( self.className, self.name, self.status )\r
-\r
- def __repr__(self):\r
- module_name = self.__class__.__module__\r
- class_name = self.__class__.__name__\r
- self_str = str( self )\r
- return "<%s.%s %s>" % ( module_name , class_name , self_str )\r
-\r
- def id(self):\r
- """\r
- Calculate an ID for this object.\r
- """\r
- return "%s.%s" % ( self.className, self.name )\r
+++ /dev/null
-from pyjenkinsci.jenkinsbase import JenkinsBase
-from pyjenkinsci.result import Result
-
-class ResultSet(JenkinsBase):
- """
- Represents a result from a completed Jenkins run.
- """
- def get_jenkins_obj(self):
- return self.build.job.get_jenkins_obj()
-
- def __init__(self, url, build ):
- """
- """
- self.build = build
- JenkinsBase.__init__( self, url )
-
- def __str__(self):
- return "Test Result for %s" % str( self.build )
-
- def keys(self):
- return [ a[0] for a in self.iteritems() ]
-
- def items(self):
- return [a for a in self.iteritems()]
-
- def iteritems(self):
- for suite in self._data.get("suites", [] ):
- for case in suite["cases"]:
- R = Result( **case )
- yield R.id(), R
-
- for report_set in self._data.get( "childReports", [] ):
- for suite in report_set["result"]["suites"]:
- for case in suite["cases"]:
- R = Result( **case )
- yield R.id(), R
-
- def __len__(self):
- return sum( 1 for x in self.iteritems() )
+++ /dev/null
-import logging
-import time
-
-log = logging.getLogger( __name__ )
-
-IGNORE_EXCEPTIONS = [ AttributeError, KeyboardInterrupt ]
-
-DEFAULT_SLEEP_TIME = 1
-
-def retry_function( tries, fn, *args, **kwargs ):
- """
- Retry function - calls an unreliable function n times before giving up, if tries is exceeded
- and it still fails the most recent exception is raised.
- """
- assert isinstance( tries, int ), "Tries should be a non-zero positive integer"
- assert tries > 0, "Tries should be a non-zero positive integer"
- for attempt in range(0, tries):
- attemptno = attempt + 1
- if attemptno == tries:
- log.warn( "Last chance: #%i of %i" % ( attemptno, tries ) )
- elif tries > attempt > 0:
- log.warn( "Attempt #%i of %i" % ( attemptno, tries ) )
- try:
- result = fn( *args, **kwargs )
- if attempt > 0:
- log.info( "Result obtained after attempt %i" % attemptno )
- return result
- except Exception, e:
- if type(e) in IGNORE_EXCEPTIONS:
- # Immediatly raise in some cases.
- raise
- try:
- fn_name = fn.__name__
- except AttributeError, ae:
- fn_name = "Anonymous Function"
- log.exception(e)
- log.warn( "%s failed at attempt %i, trying again." % ( fn_name , attemptno ) )
- time.sleep( DEFAULT_SLEEP_TIME )
- raise e
+++ /dev/null
-import urllib2
-import base64
-
-import logging
-
-log = logging.getLogger( __name__ )
-
-class PreemptiveBasicAuthHandler(urllib2.BaseHandler):
- """
- A BasicAuthHandler class that will add Basic Auth headers to a request
- even when there is no basic auth challenge from the server
- Jenkins does not challenge basic auth but expects it to be present
- """
- def __init__(self, password_mgr=None):
- if password_mgr is None:
- password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
- self.passwd = password_mgr
- self.add_password = self.passwd.add_password
-
- def http_request(self,req):
- uri = req.get_full_url()
- user, pw = self.passwd.find_user_password(None,uri)
- log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw))
- if pw is None: return req
- raw = "%s:%s" % (user, pw)
- auth = 'Basic %s' % base64.b64encode(raw).strip()
- req.add_unredirected_header('Authorization', auth)
- return req
-
-def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ):
- """
- Creates an url opener that works with both jenkins auth and proxy auth
- If no values are provided for the jenkins or proxy vars, a regular opener is returned
- :param jenkinsuser: username for jenkins, str
- :param jenkinspass: password for jenkins, str
- :param jenkinsurl: jenkins url, str
- :param proxyhost: proxy hostname, str
- :param proxyport: proxy port, int
- :param proxyuser: proxy username, str
- :param proxypass: proxy password, str
- :return: urllib2.opener configured for auth
- """
- handlers = []
- for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl):
- handlers.append(handler)
- for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
- handlers.append(handler)
- opener = urllib2.build_opener(*handlers)
- return opener.open
-
-def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl):
- """
- Get a basic authentification handler for jenkins
- :param jenkinsuser: jenkins username, str
- :param jenkinspass: jenkins password, str
- :param jenkinsurl: jenkins base url, str
- :return: a list of handlers
- """
- for param in jenkinsuser, jenkinspass, jenkinsurl:
- if param is None:
- return []
- assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser)
- assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass)
-# hostname = urlparse.urlsplit(jenkinsurl).hostname
- handler = PreemptiveBasicAuthHandler()
- handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass)
- log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser))
- return [ handler ]
-
-def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
- """
- Get a configured handler for a proxy
-
- :param proxyhost: proxy hostname, str
- :param proxyport: proxy port, int
- :param proxyuser: proxy username, str
- :param proxypass: proxy password, str
- :return: list of handlers
- """
- for param in proxyhost, proxyport, proxyuser, proxypass:
- if param is None:
- return []
- assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )
- assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )
- assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )
-
- proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),
- 'https': 'http://%s:%i/' % (proxyhost, proxyport) }
-
- proxy_handler = urllib2.ProxyHandler( proxy_spec )
- proxy_auth_handler = urllib2.HTTPBasicAuthHandler()
- proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )
- return [proxy_handler, proxy_auth_handler]
-
+++ /dev/null
-from pyjenkinsci.jenkinsbase import JenkinsBase\r
-from pyjenkinsci.job import Job\r
-\r
-class View(JenkinsBase):\r
-\r
- def __init__(self, url, name, jenkins_obj):\r
- self.name = name\r
- self.jenkins_obj = jenkins_obj\r
- JenkinsBase.__init__(self, url)\r
-\r
- def __str__(self):\r
- return self.name\r
-\r
- def __getitem__(self, str_job_id ):\r
- assert isinstance( str_job_id, str )\r
- api_url = self.python_api_url( self.get_job_url( str_job_id ) )\r
- return Job( api_url, str_job_id, self.jenkins_obj )\r
-\r
- def keys(self):\r
- return self.get_job_dict().keys()\r
-\r
- def iteritems(self):\r
- for name, url in self.get_job_dict().iteritems():\r
- api_url = self.python_api_url( url )\r
- yield name, Job( api_url, name, self.jenkins_obj )\r
-\r
- def values(self):\r
- return [ a[1] for a in self.iteritems() ]\r
-\r
- def items(self):\r
- return [ a for a in self.iteritems() ]\r
-\r
- def _get_jobs( self ):\r
- if not self._data.has_key( "jobs" ):\r
- pass\r
- else:\r
- for viewdict in self._data["jobs"]:\r
- yield viewdict["name"], viewdict["url"]\r
-\r
- def get_job_dict(self):\r
- return dict( self._get_jobs() )\r
-\r
- def __len__(self):\r
- return len( self.get_job_dict().keys() )\r
-\r
- def get_job_url( self, str_job_name ):\r
- try:\r
- job_dict = self.get_job_dict()\r
- return job_dict[ str_job_name ]\r
- except KeyError, ke:\r
- all_views = ", ".join( job_dict.keys() )\r
- raise KeyError("Job %s is not known - available: %s" % ( str_job_name, all_views ) )\r
-\r
- def get_jenkins_obj(self):\r
- return self.jenkins_obj\r
-\r
- def id(self):\r
- """\r
- Calculate an ID for this object.\r
- """\r
- return "%s.%s" % ( self.className, self.name )
\ No newline at end of file
-from setuptools import setup, find_packages\r
-\r
-GLOBAL_ENTRY_POINTS = {\r
- "console_scripts":[ "jenkins_invoke=pyjenkinsci.command_line.hudson_invoke:main",\r
- "meta_test=pyjenkinsci.command_line.meta_test:main", ] }\r
-\r
-setup(name='pyjenkinsci',\r
- version='0.0.35.1',\r
- description='A Python API for accessing resources on a Jenkins continuous-integration server.',\r
- author='Salim Fadhley',\r
- author_email='sal@stodge.org',\r
- package_dir = {'':'pyjenkinsci'},\r
- packages=find_packages('pyjenkinsci'),\r
- zip_safe=True,\r
- include_package_data = False,\r
- entry_points = GLOBAL_ENTRY_POINTS,\r
- url="https://github.com/ramonvanalteren/pyjenkinsci",\r
- )\r
+from setuptools import setup, find_packages
+
+GLOBAL_ENTRY_POINTS = {
+ "console_scripts":[ "jenkins_invoke=jenkinsapi.command_line.jenkins_invoke:main"]
+ }
+
+setup(name='jenkinsapi',
+ version='0.1',
+ description='A Python API for accessing resources on a Jenkins continuous-integration server.',
+ author="Ramon van Alteren",
+ author_email='ramon@vanalteren.nl',
+ package_dir = {'':'jenkinsapi'},
+ packages=find_packages('jenkinsapi'),
+ zip_safe=True,
+ include_package_data = False,
+ entry_points = GLOBAL_ENTRY_POINTS,
+ url="https://github.com/ramonvanalteren/jenkinsapi",
+ )