From: Ramon van Alteren Date: Mon, 2 Jan 2012 13:53:37 +0000 (+0100) Subject: Simplyfied dir setup X-Git-Tag: v0.2.23~361 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=f6a231845680f6146e9158c8292f763c6ccdfb24;p=tools%2Fpython-jenkinsapi.git Simplyfied dir setup Removed jenkinsci_egg and src dirs Removed tests, highly env dependant and not very usefull --- diff --git a/publish.bat b/publish.bat deleted file mode 100644 index d820450..0000000 --- a/publish.bat +++ /dev/null @@ -1,3 +0,0 @@ -cd /D %~dp0 -cd pyjenkinsci_egg -python setup.py bdist_egg \ No newline at end of file diff --git a/pyjenkinsci/__init__.py b/pyjenkinsci/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyjenkinsci/api.py b/pyjenkinsci/api.py new file mode 100644 index 0000000..a40006a --- /dev/null +++ b/pyjenkinsci/api.py @@ -0,0 +1,137 @@ +import artifact +import constants +import jenkins +from exceptions import ArtifactsMissing, TimeOut, BadURL +from urllib2 import urlparse + +import os +import time +import logging + +log = logging.getLogger(__name__) + +def get_latest_test_results( jenkinsurl, jobname ): + """ + A convenience function to fetch down the very latest test results from a jenkins job. + """ + latestbuild = get_latest_build( jenkinsurl, jobname ) + res = latestbuild.get_resultset() + return res + +def get_latest_build( jenkinsurl, jobname ): + """ + A convenience function to fetch down the very latest test results from a jenkins job. + """ + jenkinsci = jenkins( jenkinsurl ) + job = jenkinsci[ jobname ] + return job.get_last_build() + +def get_latest_complete_build( jenkinsurl, jobname ): + """ + A convenience function to fetch down the very latest test results from a jenkins job. + """ + jenkinsci = jenkins( jenkinsurl ) + job = jenkinsci[ jobname ] + return job.get_last_completed_build() + +def get_artifacts( jenkinsurl, jobid=None, build_no=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None ): + """ + Find all the artifacts for the latest build of a job. + """ + jenkinsci = jenkins( jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ) + job = jenkinsci[ jobid ] + if build_no: + build = job.get_build( build_no ) + else: + build = job.get_last_good_build() + artifacts = dict( (artifact.filename, artifact) for artifact in build.get_artifacts() ) + log.info("Found %i artifacts in '%s'" % ( len(artifacts.keys() ), build_no ) ) + return artifacts + +def search_artifacts(jenkinsurl, jobid, artifact_ids=None, same_build=True, build_search_limit=None): + """ + Search the entire history of a jenkins job for a list of artifact names. If same_build + is true then ensure that all artifacts come from the same build of the job + """ + if len( artifact_ids ) == 0 or artifact_ids is None: + return [] + assert same_build, "same_build==False not supported yet" + jenkinsci = jenkins( jenkinsurl ) + job = jenkinsci[ jobid ] + build_ids = job.get_build_ids() + for build_id in build_ids: + build = job.get_build( build_id ) + artifacts = build.get_artifact_dict() + if set( artifact_ids ).issubset( set( artifacts.keys() ) ): + return dict( ( a,artifacts[a] ) for a in artifact_ids ) + missing_artifacts = set( artifact_ids ) - set( artifacts.keys() ) + log.debug("Artifacts %s missing from %s #%i" % ( ", ".join( missing_artifacts ), jobid, build_id )) + raise ArtifactsMissing( missing_artifacts ) + +def grab_artifact( jenkinsurl, jobid, artifactid, targetdir ): + """ + Convenience method to find the latest good version of an artifact and save it + to a target directory. Directory is made automatically if not exists. + """ + artifacts = get_artifacts( jenkinsurl, jobid ) + artifact = artifacts[ artifactid ] + if not os.path.exists( targetdir ): + os.makedirs( targetdir ) + artifact.savetodir( targetdir) + +def block_until_complete( jenkinsurl, jobs, maxwait=12000, interval=30, raise_on_timeout=True ): + """ + Wait until all of the jobs in the list are complete. + """ + assert maxwait > 0 + assert maxwait > interval + assert interval > 0 + + obj_jenkins = jenkins( jenkinsurl ) + obj_jobs = [ obj_jenkins[ jid ] for jid in jobs ] + for time_left in xrange( maxwait, 0, -interval ): + still_running = [ j for j in obj_jobs if j.is_queued_or_running() ] + if not still_running: + return + str_still_running = ", ".join( '"%s"' % str(a) for a in still_running ) + log.warn( "Waiting for jobs %s to complete. Will wait another %is" % ( str_still_running, time_left ) ) + time.sleep( interval ) + if raise_on_timeout: + raise TimeOut( "Waited too long for these jobs to complete: %s" % str_still_running ) + +def get_view_from_url( url ): + """ + Factory method + """ + matched = constants.RE_SPLIT_VIEW_URL.search(url) + if not matched: + raise BadURL("Cannot parse URL %s" % url ) + jenkinsurl, view_name = matched.groups() + jenkinsci = jenkins( jenkinsurl ) + return jenkinsci.get_view( view_name ) + +def install_artifacts( artifacts, dirstruct, installdir, basestaticurl ): + """ + Install the artifacts. + """ + assert basestaticurl.endswith("/"), "Basestaticurl should end with /" + installed = [] + for reldir, artifactnames in dirstruct.items(): + destdir = os.path.join( installdir, reldir ) + if not os.path.exists( destdir ): + log.warn( "Making install directory %s" % destdir ) + os.makedirs( destdir ) + else: + assert os.path.isdir( destdir ) + for artifactname in artifactnames: + destpath = os.path.abspath( os.path.join( destdir, artifactname ) ) + if artifactname in artifacts.keys(): + # The artifact must be loaded from jenkins + theartifact = artifacts[ artifactname ] + else: + # It's probably a static file, we can get it from the static collection + staticurl = urlparse.urljoin( basestaticurl, artifactname ) + theartifact = artifact( artifactname, staticurl ) + theartifact.save( destpath ) + installed.append( destpath ) + return installed diff --git a/pyjenkinsci/artifact.py b/pyjenkinsci/artifact.py new file mode 100644 index 0000000..a3b0193 --- /dev/null +++ b/pyjenkinsci/artifact.py @@ -0,0 +1,188 @@ +import urllib2 +import os +import logging +import cStringIO +import zipfile +import cPickle +import datetime +import config + +from utils.retry import retry_function +from exceptions import ArtifactBroken +from utils.md5hash import new_digest + +log = logging.getLogger( __name__ ) + +class artifact( object ): + + @staticmethod + def timedelta_to_seconds( td ): + secs = float( td.seconds ) + secs += td.microseconds / 1000000.0 + secs += td.days * 86400 + return secs + + def __init__( self, filename, url, build=None ): + self.filename = filename + self.url = url + self.build = build + + def unpickle(self, method="pickle" ): + """ + Assume that the object is a pickled stream. + """ + stream, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream ) + + while True: + try: + yield cPickle.load( stream ) + except EOFError: + break + + def logging_buffer_copy( self, input_stream, output_stream, length, chunks=10 ): + + chunk_points = int( length / chunks ) + + start_time = datetime.datetime.now() + last_time = datetime.datetime.now() + + for index in xrange( 0, length ): + output_stream.write( input_stream.read(1) ) + + if chunk_points > 0: + if ( index % chunk_points ) == 0 and ( index > 0 ): + now = datetime.datetime.now() + + try: + time_elapsed_since_start = self.timedelta_to_seconds( now - start_time ) + # avg_bitrate = ( index / time_elapsed_since_start ) / 1024.0 + time_elapsed_since_last_chunk = self.timedelta_to_seconds( now - last_time ) + inst_bitrate = ( chunk_points / time_elapsed_since_last_chunk ) / 1024.0 + except ZeroDivisionError, _: + continue + + log.info( "Loaded %i of %i bytes %.2f kbit/s" % ( index, length, inst_bitrate ) ) + last_time = now + + + def getstream( self ): + """ + Get the artifact as a stream + """ + artifact_digest = new_digest() + tmp_buffer = cStringIO.StringIO() + + if self.build: + fn_opener = self.build.job.hudson.get_opener() + else: + fn_opener = urllib2.urlopen + + try: + inputstream = fn_opener( self.url, ) + content_type = inputstream.info().get("content-type", "unknown") + + try: + content_length = int( inputstream.info()["content-length"] ) + self.logging_buffer_copy( inputstream, tmp_buffer, content_length ) + except KeyError, ke: + # Could not get length. + log.warn("Could not get length") + tmp_buffer.write( inputstream.read() ) + + except urllib2.HTTPError: + log.warn( "Error fetching %s" % self.url ) + raise + tmp_buffer.seek(0) + + artifact_digest.update(tmp_buffer.getvalue()) + artifact_hexdigest = artifact_digest.hexdigest() + + artifact_size = len(tmp_buffer.getvalue()) + log.info( "Got %s, %i bytes, MD5: %s, type: %s" % ( self.filename, artifact_size, artifact_hexdigest, content_type ) ) + + if self.build: + self.build.job.hudson.validate_fingerprint( artifact_hexdigest ) + + return tmp_buffer, artifact_hexdigest + + def openzip( self ): + """ + Open the artifact as a zipfile. + """ + buffer, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream ) + zf = zipfile.ZipFile( buffer, "r" ) + return zf + + def save( self, fspath ): + """ + Save the artifact to an explicit path. The containing directory must exist. + Returns a reference to the file which has just been writen to. + """ + + log.info( "Saving artifact @ %s to %s" % (self.url, fspath) ) + + if not fspath.endswith( self.filename ): + log.warn( "Attempt to change the filename of artifact %s on save." % self.filename ) + + if os.path.exists( fspath ): + existing_hexdigest = self.get_local_digest( fspath ) + if self.build: + try: + valid = self.build.job.hudson.validate_fingerprint_for_build( existing_hexdigest, filename=self.filename, job=self.build.job.id(), build=self.build.id() ) + + if valid: + log.info( "Local copy of %s is already up to date. MD5 %s" % (self.filename, existing_hexdigest) ) + else: + self.__do_download( fspath ) + except ArtifactBroken, ab: #@UnusedVariable + log.info("Hudson artifact could not be identified.") + else: + log.info("This file did not originate from Hudson, so cannot check.") + self.__do_download( fspath ) + else: + log.info("Local file is missing, downloading new.") + self.__do_download( fspath ) + + def get_local_digest( self, fspath ): + tmp_buffer_existing = cStringIO.StringIO() + existingfile = open( fspath, "rb" ) + tmp_buffer_existing.write( existingfile.read() ) + existing_digest = new_digest() + existing_digest.update(tmp_buffer_existing.getvalue()) + existing_hexdigest = existing_digest.hexdigest() + return existing_hexdigest + + def __do_download( self, fspath ): + + filedir, _ = os.path.split( fspath ) + if not os.path.exists( filedir ): + log.warn( "Making missing directory %s" % filedir ) + os.makedirs( filedir ) + + try: + outputfile = open( fspath, "wb" ) + except IOError, ioe: + log.critical("User %s@%s cannot open file" % ( os.environ.get("USERNAME","unknown"),os.environ.get("USERDOMAIN","unknown") ) ) + raise + + tmp_buffer_downloaded, artifact_hexdigest = retry_function( config.LOAD_ATTEMPTS , self.getstream ) + + outputfile.write( tmp_buffer_downloaded.getvalue() ) + return outputfile + + + def savetodir( self, dirpath ): + """ + Save the artifact to a folder. The containing directory must be exist, but use the artifact's + default filename. + """ + assert os.path.exists( dirpath ) + assert os.path.isdir( dirpath ) + outputfilepath = os.path.join( dirpath, self.filename ) + self.save( outputfilepath ) + + + def __repr__( self ): + return """<%s.%s %s>""" % ( self.__class__.__module__, + self.__class__.__name__, + self.url ) diff --git a/pyjenkinsci/build.py b/pyjenkinsci/build.py new file mode 100644 index 0000000..00fe594 --- /dev/null +++ b/pyjenkinsci/build.py @@ -0,0 +1,107 @@ +import artifact +import config +import jenkinsobject +import time +import logging +from exceptions import NoResults, FailedNoResults +from constants import STATUS_FAIL, STATUS_ABORTED, RESULTSTATUS_FAILURE +import result_set + +log = logging.getLogger(__name__) + +class build(jenkinsobject): + """ + Represents a jenkins build, executed in context of a job. + """ + + STR_TOTALCOUNT = "totalCount" + STR_TPL_NOTESTS_ERR = "%s has status %s, and does not have any test results" + + def __init__( self, url, buildno, job ): + assert type(buildno) == int + self.buildno = buildno + self.job = job + jenkinsobject.__init__( self, url ) + + def __str__(self): + return self._data['fullDisplayName'] + + def id(self): + return self._data["number"] + + def get_status(self): + return self._data["result"] + + def get_duration(self): + return self._data["duration"] + + def get_artifacts( self ): + for afinfo in self._data["artifacts"]: + url = "%sartifact/%s" % ( self.baseurl, afinfo["relativePath"] ) + af = artifact( afinfo["fileName"], url, self ) + yield af + del af, url + + def get_artifact_dict(self): + return dict( (a.filename, a) for a in self.get_artifacts() ) + + def is_running( self ): + """ + Return a bool if running. + """ + self.poll() + return self._data["building"] + + def is_good( self ): + """ + Return a bool, true if the build was good. + If the build is still running, return False. + """ + return ( not self.is_running() ) and self._data["result"] == 'SUCCESS' + + def block_until_complete(self, delay=15): + assert isinstance( delay, int ) + count = 0 + while self.is_running(): + total_wait = delay * count + log.info("Waited %is for %s #%s to complete" % ( total_wait, self.job.id(), self.id() ) ) + time.sleep( delay ) + count += 1 + + def get_jenkins_obj(self): + return self.job.get_jenkins_obj() + + def get_result_url(self): + """ + Return the URL for the object which provides the job's result summary. + """ + url_tpl = r"%stestReport/%s" + return url_tpl % ( self._data["url"] , config.JENKINS_API ) + + def get_resultset(self): + """ + Obtain detailed results for this build. + """ + result_url = self.get_result_url() + if self.STR_TOTALCOUNT not in self.get_actions(): + raise NoResults( "%s does not have any published results" % str(self) ) + buildstatus = self.get_status() + if buildstatus in [ STATUS_FAIL, RESULTSTATUS_FAILURE, STATUS_ABORTED ]: + raise FailedNoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) ) + if self.get_actions()[ self.STR_TOTALCOUNT ] == 0: + raise NoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) ) + obj_results = result_set( result_url, build=self ) + return obj_results + + def has_resultset(self): + """ + Return a boolean, true if a result set is available. false if not. + """ + return self.STR_TOTALCOUNT in self.get_actions() + + def get_actions(self): + all_actions = {} + for dct_action in self._data["actions"]: + all_actions.update( dct_action ) + return all_actions + diff --git a/pyjenkinsci/command_line/__init__.py b/pyjenkinsci/command_line/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyjenkinsci/command_line/jenkins_invoke.py b/pyjenkinsci/command_line/jenkins_invoke.py new file mode 100644 index 0000000..3f9afd0 --- /dev/null +++ b/pyjenkinsci/command_line/jenkins_invoke.py @@ -0,0 +1,63 @@ +import os +import sys +import logging +import optparse +import jenkins + +log = logging.getLogger(__name__) + +class jenkins_invoke(object): + + @classmethod + def mkparser(cls): + parser = optparse.OptionParser() + DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" ) + parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete." + parser.add_option("-J", "--jenkinsbase", dest="baseurl", + help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL, + type="str", + default=DEFAULT_BASEURL, ) + parser.add_option("-b", "--block", dest="block", + help="Block until each of the jobs is complete." , + action="store_true", + default=False ) + parser.add_option("-t", "--token", dest="token", + help="Optional security token." , + default=None ) + return parser + + @classmethod + def main(cls): + parser = cls.mkparser() + options, args = parser.parse_args() + try: + assert len( args ) > 0, "Need to specify at least one job name" + except AssertionError, e: + log.critical( e[0] ) + parser.print_help() + sys.exit(1) + invoker = cls( options, args ) + invoker() + + def __init__( self, options, jobs ): + self.options = options + self.jobs = jobs + + def __call__(self): + for job in self.jobs: + self.invokejob( job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token ) + + def invokejob(self, jobname, block, baseurl, token ): + assert type(block) == bool + assert type(baseurl) == str + assert type(jobname) == str + assert token is None or isinstance( token, str ) + jenkinsserver = jenkins( baseurl ) + job = jenkinsserver[ jobname ] + job.invoke( securitytoken=token, block=block ) + + +def main( ): + logging.basicConfig() + logging.getLogger("").setLevel( logging.INFO ) + jenkins_invoke.main() diff --git a/pyjenkinsci/command_line/meta_test.py b/pyjenkinsci/command_line/meta_test.py new file mode 100644 index 0000000..c918339 --- /dev/null +++ b/pyjenkinsci/command_line/meta_test.py @@ -0,0 +1,50 @@ +import optparse +import os +import random +import logging +from utils.id import mk_id +from utils import junitxml + +log = logging.getLogger(__name__) + +class meta_test(object): + ATTEMPTS=3 + + @classmethod + def mkParser(cls): + parser = optparse.OptionParser() + + def __init__(self, opts=None): + self.opts = opts + + def testFunction(self): + if random.random() < 0.1: + raise AssertionError("The value was too small") + return 0 + + def __call__(self): + temp_dir = os.environ.get("TEMP", r"c:\temp" ) + output_dir = os.environ.get( "WORKSPACE", temp_dir ) + result_filepath = os.path.join( output_dir, "results.xml" ) + stream = open( result_filepath, "wb" ) + testsuite_name = mk_id() + ju = junitxml.junitxml( stream, testsuite_name) + + + classname = mk_id() + for i in xrange(0, self.ATTEMPTS ): + tr = ju.startTest( classname, mk_id() ) + try: + tr.run( self.testFunction ) + except Exception, e: + log.exception(e) + continue + + ju.write() + +def main( ): + logging.basicConfig() + return meta_test()() + +if __name__ == "__main__": + main() diff --git a/pyjenkinsci/config.py b/pyjenkinsci/config.py new file mode 100644 index 0000000..aba8f70 --- /dev/null +++ b/pyjenkinsci/config.py @@ -0,0 +1,3 @@ +JENKINS_API = r"api/python/" +LOAD_TIMEOUT = 30 +LOAD_ATTEMPTS = 5 \ No newline at end of file diff --git a/pyjenkinsci/constants.py b/pyjenkinsci/constants.py new file mode 100644 index 0000000..d456dff --- /dev/null +++ b/pyjenkinsci/constants.py @@ -0,0 +1,15 @@ +import re + +STATUS_FAIL = "FAIL" +STATUS_ERROR = "ERROR" +STATUS_ABORTED = "ABORTED" +STATUS_REGRESSION = "REGRESSION" + +STATUS_FIXED = "FIXED" +STATUS_PASSED = "PASSED" + +RESULTSTATUS_FAILURE = "FAILURE" +RESULTSTATUS_FAILED = "FAILED" + +STR_RE_SPLIT_VIEW = "(.*)/view/([^/]*)/?" +RE_SPLIT_VIEW_URL = re.compile( STR_RE_SPLIT_VIEW ) diff --git a/pyjenkinsci/exceptions.py b/pyjenkinsci/exceptions.py new file mode 100644 index 0000000..6a2b6cc --- /dev/null +++ b/pyjenkinsci/exceptions.py @@ -0,0 +1,44 @@ +class ArtifactsMissing(Exception): + """ + Cannot find a build with all of the required artifacts. + """ + +class UnknownJob( KeyError ): + """ + Hudson does not recognize the job requested. + """ + +class ArtifactBroken(Exception): + """ + An artifact is broken, wrong + """ + +class TimeOut( Exception ): + """ + Some jobs have taken too long to complete. + """ + +class WillNotBuild(Exception): + """ + Cannot trigger a new build. + """ + +class NoBuildData(Exception): + """ + A job has no build data. + """ + +class NoResults(Exception): + """ + A build did not publish any results. + """ + +class FailedNoResults(NoResults): + """ + A build did not publish any results because it failed + """ + +class BadURL(ValueError): + """ + A URL appears to be broken + """ diff --git a/pyjenkinsci/fingerprint.py b/pyjenkinsci/fingerprint.py new file mode 100644 index 0000000..836dc29 --- /dev/null +++ b/pyjenkinsci/fingerprint.py @@ -0,0 +1,82 @@ +import jenkinsobject +from exceptions import ArtifactBroken + +import urllib2 +import re + +import logging + +log = logging.getLogger( __name__ ) + +class fingerprint(jenkinsobject): + """ + Represents a jenkins fingerprint on a single artifact file ?? + """ + RE_MD5 = re.compile("^([0-9a-z]{32})$") + + def __init__( self, baseurl, id, jenkins_obj ): + logging.basicConfig() + self.jenkins_obj = jenkins_obj + assert self.RE_MD5.search( id ), "%s does not look like a valid id" % id + url = "%s/fingerprint/%s/" % ( baseurl, id ) + jenkinsobject.__init__( self, url, poll=False ) + self.id = id + + def get_jenkins_obj(self): + return self.jenkins_obj + + def __str__(self): + return self.id + + def valid(self): + """ + Return True / False if valid + """ + try: + self.poll() + except urllib2.HTTPError, e: + return False + return True + + def validate_for_build(self, filename, job, build): + if not self.valid(): + log.info("Unknown to jenkins.") + return False + if not self._data["original"] is None: + if self._data["original"]["name"] == job: + if self._data["original"]["number"] == build: + return True + if self._data["fileName"] != filename: + log.info("Filename from jenkins (%s) did not match provided (%s)" % ( self._data["fileName"], filename ) ) + return False + for usage_item in self._data["usage"]: + if usage_item["name"] == job: + for range in usage_item["ranges"]["ranges"]: + if range["start"] <= build <= range["end"]: + log.info("This artifact was generated by %s between build %i and %i" % ( job, range["start"], range["end"] ) ) + return True + return False + + def validate(self): + try: + assert self.valid() + except AssertionError, ae: + raise ArtifactBroken( "Artifact %s seems to be broken, check %s" % ( self.id, self.baseurl ) ) + except urllib2.HTTPError, httpe: + raise ArtifactBroken( "Unable to validate artifact id %s using %s" % ( self.id, self.baseurl ) ) + return True + + def get_info( self ): + """ + Returns a tuple of build-name, build# and artifiact filename for a good build. + """ + self.poll() + return self._data["original"]["name"], self._data["original"]["number"], self._data["fileName"] + + +if __name__ == "__main__": + ff = fingerprint( "http://localhost:8080/hudson/", "0f37cbb6545b8778bc0700d90be66bf3" ) + print repr(ff) + print ff.baseurl + print ff.valid() + print ff.get_info( ) diff --git a/pyjenkinsci/jenkins.py b/pyjenkinsci/jenkins.py new file mode 100644 index 0000000..4b83074 --- /dev/null +++ b/pyjenkinsci/jenkins.py @@ -0,0 +1,96 @@ +import fingerprint +import jenkinsobject +import job +from exceptions import UnknownJob +from utils.urlopener import mkurlopener +import logging +import time +import view + +log = logging.getLogger(__name__) + +class jenkins(jenkinsobject): + """ + Represents a jenkins environment. + """ + def __init__(self, baseurl, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None): + self.proxyhost = proxyhost + self.proxyport = proxyport + self.proxyuser = proxyuser + self.proxypass = proxypass + jenkinsobject.__init__( self, baseurl ) + + def get_proxy_auth(self): + return (self.proxyhost, self.proxyport, self.proxyuser, self.proxypass) + + def get_opener( self ): + return mkurlopener(*self.get_proxy_auth()) + + def validate_fingerprint( self, id ): + obj_fingerprint = fingerprint(self.baseurl, id, jenkins_obj=self) + obj_fingerprint.validate() + log.info("Jenkins says %s is valid" % id) + + def get_artifact_data(self, id): + obj_fingerprint = fingerprint(self.baseurl, id, jenkins_obj=self) + obj_fingerprint.validate() + return obj_fingerprint.get_info() + + def validate_fingerprint_for_build(self, digest, filename, job, build ): + obj_fingerprint = fingerprint( self.baseurl, digest, jenkins_obj=self ) + return obj_fingerprint.validate_for_build( filename, job, build ) + + def get_jenkins_obj(self): + return self + + def get_jobs(self): + """ + Fetch all the build-names on this Hudson server. + """ + for info in self._data["jobs"]: + yield info["name"], job( info["url"], info["name"], jenkins_obj=self) + + def iteritems(self): + return self.get_jobs() + + def iterkeys(self): + for info in self._data["jobs"]: + yield info["name"] + + def keys(self): + return [ a for a in self.iterkeys() ] + + def __str__(self): + return "Jenkins server at %s" % self.baseurl + + def _get_views( self ): + if not self._data.has_key( "views" ): + pass + else: + for viewdict in self._data["views"]: + yield viewdict["name"], viewdict["url"] + + def get_view_dict(self): + return dict( self._get_views() ) + + def get_view_url( self, str_view_name ): + try: + view_dict = self.get_view_dict() + return view_dict[ str_view_name ] + except KeyError, ke: + all_views = ", ".join( view_dict.keys() ) + raise KeyError("View %s is not known - available: %s" % ( str_view_name, all_views ) ) + + def get_view(self, str_view_name ): + view_url = self.get_view_url( str_view_name ) + view_api_url = self.python_api_url( view_url ) + return view(view_api_url , str_view_name, jenkins_obj=self) + + def __getitem__( self, buildname ): + """ + Get a build + """ + for name, job in self.get_jobs(): + if name == buildname: + return job + raise UnknownJob(buildname) diff --git a/pyjenkinsci/jenkinsobject.py b/pyjenkinsci/jenkinsobject.py new file mode 100644 index 0000000..e97ef95 --- /dev/null +++ b/pyjenkinsci/jenkinsobject.py @@ -0,0 +1,68 @@ +import urllib2 +import logging +import pprint +import config +from utils.retry import retry_function + +log = logging.getLogger( __name__ ) + +class jenkinsobject( object ): + """ + This appears to be the base object that all other jenkins objects are inherited from + """ + RETRY_ATTEMPTS = 5 + + def __repr__( self ): + return """<%s.%s %s>""" % ( self.__class__.__module__, + self.__class__.__name__, + str( self ) ) + + def print_data(self): + pprint.pprint( self._data ) + + def __str__(self): + raise NotImplemented + + def __init__( self, baseurl, poll=True ): + """ + Initialize a jenkins connection + """ + self.baseurl = baseurl + if poll: + try: + self.poll() + except urllib2.HTTPError, hte: + log.exception(hte) + log.warn( "Failed to conenct to %s" % baseurl ) + raise + + def poll(self): + self._data = self._poll() + + def _poll(self): + url = self.python_api_url( self.baseurl ) + return retry_function( self.RETRY_ATTEMPTS , self.get_data, url ) + + @classmethod + def python_api_url( cls, url ): + if url.endswith( config.JENKINS_API ): + return url + else: + if url.endswith( r"/" ): + fmt="%s%s" + else: + fmt = "%s/%s" + return fmt % (url, config.JENKINS_API) + + def get_data( self, url ): + """ + Find out how to connect, and then grab the data. + """ + fn_urlopen = self.getHudsonObject().get_opener() + try: + stream = fn_urlopen( url ) + result = eval( stream.read() ) + except urllib2.HTTPError, e: + log.warn( "Error reading %s" % url ) + raise + return result diff --git a/pyjenkinsci/job.py b/pyjenkinsci/job.py new file mode 100644 index 0000000..9d6889b --- /dev/null +++ b/pyjenkinsci/job.py @@ -0,0 +1,177 @@ +import logging +import urlparse +import urllib2 +import time +import build +import jenkinsobject + +from exceptions import NoBuildData + +log = logging.getLogger(__name__) + +class job(jenkinsobject): + """ + Represents a jenkins job + A job can hold N builds which are the actual execution environments + """ + def __init__( self, url, name, jenkins_obj ): + self.name = name + self.jenkins = jenkins_obj + jenkinsobject.__init__( self, url ) + + def id( self ): + return self._data["name"] + + def __str__(self): + return self._data["name"] + + def get_jenkins_obj(self): + return self.jenkins + + def get_build_triggerurl( self, token=None ): + if token is None: + extra = "build" + else: + assert isinstance(token, str ), "token if provided should be a string." + extra = "build?token=%s" % token + buildurl = urlparse.urljoin( self.baseurl, extra ) + return buildurl + + def hit_url(self, url ): + fn_urlopen = self.get_jenkins_obj().get_opener() + try: + stream = fn_urlopen( url ) + html_result = stream.read() + except urllib2.HTTPError, e: + log.debug( "Error reading %s" % url ) + raise + return html_result + + def invoke( self, securitytoken=None, block=False, skip_if_running=False, invoke_pre_check_delay=3, invoke_block_delay=15 ): + assert isinstance( invoke_pre_check_delay, (int, float) ) + assert isinstance( invoke_block_delay, (int, float) ) + assert isinstance( block, bool ) + assert isinstance( skip_if_running, bool ) + skip_build = False + if self.is_queued(): + log.warn( "Will not request new build because %s is already queued" % self.id() ) + skip_build = True + elif self.is_running(): + if skip_if_running: + log.warn( "Will not request new build because %s is already running" % self.id() ) + skip_build = True + else: + log.warn("Will re-schedule %s even though it is already running" % self.id() ) + original_build_no = self.get_last_buildnumber() + if skip_build: + pass + else: + log.info( "Attempting to start %s on %s" % ( self.id(), repr(self.get_jenkins_obj()) ) ) + url = self.get_build_triggerurl( securitytoken ) + html_result = self.hit_url( url ) + assert len( html_result ) > 0 + if invoke_pre_check_delay > 0: + log.info("Waiting for %is to allow Hudson to catch up" % invoke_pre_check_delay ) + time.sleep( invoke_pre_check_delay ) + if block: + total_wait = 0 + while self.is_queued(): + log.info( "Waited %is for %s to begin..." % ( total_wait, self.id() ) ) + time.sleep( invoke_block_delay ) + total_wait += invoke_block_delay + if self.is_running(): + running_build = self.get_last_build() + running_build.block_until_complete( delay=invoke_pre_check_delay ) + assert running_build.is_good() + else: + assert self.get_last_buildnumber() > original_build_no, "Job does not appear to have run." + else: + if self.is_queued(): + log.info( "%s has been queued." % self.id() ) + elif self.is_running(): + log.info( "%s is running." % self.id() ) + elif original_build_no < self.get_last_buildnumber(): + log.info( "%s has completed." % self.id() ) + else: + raise AssertionError("The job did not schedule.") + + def _buildid_for_type(self, buildtype): + """Gets a buildid for a given type of build""" + KNOWNBUILDTYPES=["lastSuccessfulBuild", "lastBuild", "lastCompletedBuild"] + assert buildtype in KNOWNBUILDTYPES + buildid = self._data[buildtype]["number"] + assert type(buildid) == int, "Build ID should be an integer, got %s" % repr( buildid ) + return buildid + + def get_last_good_buildnumber( self ): + """ + Get the numerical ID of the last good build. + """ + return self._buildid_for_type(buildtype="lastSuccessfulBuild") + + def get_last_buildnumber( self ): + """ + Get the numerical ID of the last build. + """ + return self._buildid_for_type(buildtype="lastBuild") + + def get_last_completed_buildnumber( self ): + """ + Get the numerical ID of the last complete build. + """ + return self._buildid_for_type(buildtype="lastCompletedBuild") + + def get_build_dict(self): + if not self._data.has_key( "builds" ): + raise NoBuildData( repr(self) ) + return dict( ( a["number"], a["url"] ) for a in self._data["builds"] ) + + def get_build_ids(self): + """ + Return a sorted list of all good builds as ints. + """ + return reversed( sorted( self.get_build_dict().keys() ) ) + + def get_last_good_build( self ): + """ + Get the last good build + """ + bn = self.get_last_good_buildnumber() + return self.get_build( bn ) + + def get_last_build( self ): + """ + Get the last good build + """ + bn = self.get_last_buildnumber() + return self.get_build( bn ) + + def get_last_completed_build( self ): + """ + Get the last build regardless of status + """ + bn = self.get_last_completed_buildnumber() + return self.get_build( bn ) + + def get_build( self, buildnumber ): + assert type(buildnumber) == int + url = self.get_build_dict()[ buildnumber ] + return build( url, buildnumber, job=self ) + + def __getitem__( self, buildnumber ): + return self.get_build(buildnumber) + + def is_queued_or_running(self): + return self.is_queued() or self.is_running() + + def is_queued(self): + self.poll() + return self._data["inQueue"] + + def is_running(self): + self.poll() + try: + return self.get_last_build().is_running() + except NoBuildData: + log.info("No build info available for %s, assuming not running." % str(self) ) + return False diff --git a/pyjenkinsci/result.py b/pyjenkinsci/result.py new file mode 100644 index 0000000..64bd90a --- /dev/null +++ b/pyjenkinsci/result.py @@ -0,0 +1,21 @@ +class result( object ): + def __init__(self, **kwargs ): + """ + + """ + self.__dict__.update( kwargs ) + + def __str__(self): + return "%s %s %s" % ( self.className, self.name, self.status ) + + def __repr__(self): + module_name = self.__class__.__module__ + class_name = self.__class__.__name__ + self_str = str( self ) + return "<%s.%s %s>" % ( module_name , class_name , self_str ) + + def id(self): + """ + Calculate an ID for this object. + """ + return "%s.%s" % ( self.className, self.name ) diff --git a/pyjenkinsci/result_set.py b/pyjenkinsci/result_set.py new file mode 100644 index 0000000..9e343ad --- /dev/null +++ b/pyjenkinsci/result_set.py @@ -0,0 +1,39 @@ +import jenkinsobject +import result + +class result_set(jenkinsobject): + """ + Represents a result from a completed Hudson run. + """ + def getHudsonObject(self): + return self.build.job.get_jenkins_obj() + + def __init__(self, url, build ): + """ + """ + self.build = build + jenkinsobject.__init__( self, url ) + + def __str__(self): + return "Test Result for %s" % str( self.build ) + + def keys(self): + return [ a[0] for a in self.iteritems() ] + + def items(self): + return [a for a in self.iteritems()] + + def iteritems(self): + for suite in self._data.get("suites", [] ): + for case in suite["cases"]: + R = result( **case ) + yield R.id(), R + + for report_set in self._data.get( "childReports", [] ): + for suite in report_set["result"]["suites"]: + for case in suite["cases"]: + R = result( **case ) + yield R.id(), R + + def __len__(self): + return sum( 1 for x in self.iteritems() ) diff --git a/pyjenkinsci/utils/__init__.py b/pyjenkinsci/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyjenkinsci/utils/bufwrapper.py b/pyjenkinsci/utils/bufwrapper.py new file mode 100644 index 0000000..8addaaa --- /dev/null +++ b/pyjenkinsci/utils/bufwrapper.py @@ -0,0 +1,39 @@ +from cStringIO import StringIO + +class bufwrapper( object ): + """ + Basic buffer-wrapper - wraps up an output stream with a buffer. + """ + def __init__( self, stream, buffer=None ): + self.stream = stream + + assert hasattr( self.stream, "write" ), "%s does not support write" % repr(stream) + + if buffer is None: + self.buf = StringIO() + else: + self.buf = buffer + + def get_and_clear( self ): + """ + Get the contents of the buffer and clear it. + """ + old_buffer = self.buf + self.buf = StringIO() + return old_buffer.getvalue() + + def flush( self ): + for item in [ self.stream, self.buf ]: + if hasattr( item, "flush" ) and callable( item.flush ): + item.flush() + + + def close(self): + self.stream.close() + + def write(self, txt ): + self.stream.write(txt) + self.buf.write(txt) + + def getvalue(self): + return self.buf.getvalue() diff --git a/pyjenkinsci/utils/dates.py b/pyjenkinsci/utils/dates.py new file mode 100644 index 0000000..121257e --- /dev/null +++ b/pyjenkinsci/utils/dates.py @@ -0,0 +1,14 @@ +import datetime + +MICROSECONDS_PER_SECOND = 1000000.0 +SECONDS_PER_DAY = 86400 + +def timedelta_to_seconds( td ): + assert isinstance( td, datetime.timedelta ) + seconds = 0.0 + + seconds += td.days * SECONDS_PER_DAY + seconds += td.seconds + seconds += td.microseconds / MICROSECONDS_PER_SECOND + + return seconds diff --git a/pyjenkinsci/utils/id.py b/pyjenkinsci/utils/id.py new file mode 100644 index 0000000..097debb --- /dev/null +++ b/pyjenkinsci/utils/id.py @@ -0,0 +1,16 @@ +""" +Generate random IDs. +""" +import random + +ID_VALID = "abcdefghijklmnopqrstuvwxyz0123456789" + +def mk_id(length=5, prefix=""): + idchars = [] + for count in range( 0, length ): + idchars.append( random.choice( ID_VALID ) ) + return "%s%s" % ( prefix, "".join( idchars ) ) + +if __name__ == "__main__": + for i in range(0, 50): + print repr( mk_id( i ) ) diff --git a/pyjenkinsci/utils/junitxml.py b/pyjenkinsci/utils/junitxml.py new file mode 100644 index 0000000..f838241 --- /dev/null +++ b/pyjenkinsci/utils/junitxml.py @@ -0,0 +1,189 @@ +import logging +import datetime +import traceback +import sys + +try: + from xml.etree import ElementTree as ET +except Exception, e: + import elementtree.ElementTree as ET + +from utils.dates import timedelta_to_seconds + +log = logging.getLogger(__name__) + +class junitxml( object ): + + ERROR = "error" + FAILURE = "failure" + + def __init__( self, stream, testsuite_name="test", ): + """ + Set up a new stream + """ + assert isinstance( testsuite_name, str ) + + self.xml = ET.Element("testsuite") + self.stream = stream + + self.xml.attrib["name"] = testsuite_name + + self.count_errors = 0 + self.count_tests = 0 + self.count_failures = 0 + + def __repr__(self): + return "<%s.%s %s>" % (self.__class__.__module__, self.__class__.__name__, str(self)) + + def __str__(self): + return "Stream: %s, Tests: %i Errors: %i, Failures %i" % ( repr( self.stream ), + self.count_tests, + self.count_errors, + self.count_failures ) + + @classmethod + def get_error_strings( cls, e ): + str_error_type = "%s.%s" % ( e.__class__.__module__, e.__class__.__name__ ) + str_error_args = ",".join( [repr(ee) for ee in e.args] ) + str_doc = str( e.__doc__ ).strip() + + return str_error_type, str_error_args, str_doc + + def write(self, xml_declaration=True, encoding="utf-8"): + self.xml.attrib["errors"] = str( self.count_errors ) + self.xml.attrib["failures"] = str( self.count_failures ) + self.xml.attrib["tests"] = str( self.count_tests ) + + ET.ElementTree( self.xml ).write( self.stream, encoding=encoding, xml_declaration=xml_declaration ) + log.warn( "Wrote Junit-style XML log to %s" % self.stream ) + + def assertTrue(self, classname, testname, errmsg, fn, *args, **kwargs ): + """ + Map the interface onto an assert like statement. + Also returns the value so that we can do useful things with the result + """ + + _testname = testname.replace( ".", "_") # Dots are not permitted in names' + + def assert_fn( ): + if callable(fn): + assert fn( *args, **kwargs ), errmsg + else: + assert len(args) == 0 and len(kwargs) == 0, "Object being tested is not callable and cannot have arguments." + assert fn, "errmsg" + + tr = self.startTest(classname, _testname) + return tr.run( assert_fn ) + + def startTest( self, classname, testname, ): + return junitxml_transaction( self, classname, testname ) + + def passTest( self, classname, name, test_time ): + self.addPass( classname, name, test_time) + + def failTest(self, classname, name, test_time, error, tb, mode=FAILURE ): + """ + Add a error + """ + str_error, str_error_msg, str_doc = self.get_error_strings( error ) + enhanced_tb = "%s: %s\n\n( %s )\n\n%s" % ( repr(error), str_error_msg, str_doc, tb ) + tc = self.addPass( classname, name, test_time) + self.convertPassToFail( tc, str_error, enhanced_tb, mode=mode ) + + + def addPass(self, classname, name, test_time=0.0, ): + """ + Add a pass + """ + assert isinstance( classname, str ) + assert isinstance( name, str ) + assert isinstance( test_time, (int, float) ) + self.count_tests += 1 + testcase = ET.SubElement( self.xml, "testcase" ) + testcase.attrib["classname"] = classname + testcase.attrib["name"] = name + testcase.attrib["time"] = "%.2f" % test_time + + return testcase + + def convertPassToFail( self, tc, failtype="", tb="", mode=FAILURE ): + """ + Add a failure + """ + assert isinstance( failtype, str ) + assert isinstance( tb, str ), "Traceback should be a string, got %s" % repr(tb) + assert mode in [ self.FAILURE, self.ERROR ] + + if mode == self.FAILURE: + self.count_errors += 1 + else: + self.count_failures += 1 + + failure = ET.SubElement( tc, mode ) + failure.text = tb + failure.attrib["type"] = failtype + return failure + + +class junitxml_transaction( object ): + def __init__(self, jxml, classname, testname ): + assert isinstance( jxml, junitxml ) + self.jxml = jxml + self.classname = classname + self.testname = testname + self.start_time = datetime.datetime.now() + + def getRuntime(self): + return timedelta_to_seconds( datetime.datetime.now() - self.start_time ) + + def run( self, fn, *args, **kwargs ): + try: + result = fn( *args, **kwargs ) + self.jxml.addPass( self.classname, self.testname, self.getRuntime() ) + except Exception, e: + ex_type, ex_value, ex_tb = sys.exc_info() + + tb_formatted = traceback.format_exception( ex_type, ex_value, ex_tb ) + str_tb = "\n".join( tb_formatted ) + str_ex = "%s.%s" % ( ex_value.__class__.__module__, ex_value.__class__.__name__ ) + runtime = self.getRuntime() + + if isinstance(e, AssertionError): + self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.FAILURE ) + else: + self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.ERROR ) + + log.exception(e) + + raise e + return result + +if __name__ == "__main__": + import sys + import time + import random + + logging.basicConfig() + logging.getLogger("").setLevel( logging.INFO ) + fod = junitxml( stream=sys.stdout ) + + def fn_test( mode ): + + time.sleep( random.random( ) ) + + if mode=="pass": + return 1 + elif mode=="fail": + assert False + elif mode=="error": + {}["x"] + + for testname in [ "pass", "fail", "error" ]: + t = fod.startTest("a", testname, ) + try: + t.run( fn_test, testname ) + except Exception, e: + #log.exception(e) + pass + + fod.write() diff --git a/pyjenkinsci/utils/md5hash.py b/pyjenkinsci/utils/md5hash.py new file mode 100644 index 0000000..4fd85e8 --- /dev/null +++ b/pyjenkinsci/utils/md5hash.py @@ -0,0 +1,17 @@ +try: + import hashlib +except ImportError: + import md5 + + +def new_digest(): + if hashlib: + m = hashlib.md5() + else: + m = md5.new() + return m + +if __name__ == "__main__": + x = new_digest() + x.update("123") + print repr( x.digest() ) diff --git a/pyjenkinsci/utils/retry.py b/pyjenkinsci/utils/retry.py new file mode 100644 index 0000000..2b775ad --- /dev/null +++ b/pyjenkinsci/utils/retry.py @@ -0,0 +1,51 @@ +import logging +import time + +log = logging.getLogger( __name__ ) + +IGNORE_EXCEPTIONS = [ AttributeError, KeyboardInterrupt ] + +DEFAULT_SLEEP_TIME = 1 + +def retry_function( tries, fn, *args, **kwargs ): + """ + Retry function - calls an unreliable function n times before giving up, if tries is exceeded + and it still fails the most recent exception is raised. + """ + assert isinstance( tries, int ), "Tries should be a non-zero positive integer" + assert tries > 0, "Tries should be a non-zero positive integer" + for attempt in range(0, tries): + attemptno = attempt + 1 + if attemptno == tries: + log.warn( "Last chance: #%i of %i" % ( attemptno, tries ) ) + elif tries > attempt > 0: + log.warn( "Attempt #%i of %i" % ( attemptno, tries ) ) + try: + result = fn( *args, **kwargs ) + if attempt > 0: + log.info( "Result obtained after attempt %i" % attemptno ) + return result + except Exception, e: + if type(e) in IGNORE_EXCEPTIONS: + # Immediatly raise in some cases. + raise + try: + fn_name = fn.__name__ + except AttributeError, ae: + fn_name = "Anonymous Function" + log.exception(e) + log.warn( "%s failed at attempt %i, trying again." % ( fn_name , attemptno ) ) + time.sleep( DEFAULT_SLEEP_TIME ) + raise e + +if __name__ == "__main__": + + def broken_function( a ): + return {}[a] + + logging.basicConfig() + + try: + retry_function( 3, broken_function, "x" ) + except Exception, e: + print repr(e) diff --git a/pyjenkinsci/utils/urlopener.py b/pyjenkinsci/utils/urlopener.py new file mode 100644 index 0000000..b3739df --- /dev/null +++ b/pyjenkinsci/utils/urlopener.py @@ -0,0 +1,37 @@ +import urllib2 + +import logging + +log = logging.getLogger( __name__ ) + +DEFAULT_PROXYPORT = 80 +DEFAULT_PROXY_PASS = "Research123" +DEFAULT_PROXY_USER = "wsa_oblicqs_dev" + +def mkurlopener( proxyhost, proxyport, proxyuser, proxypass ): + if not proxyhost: + return urllib2.urlopen + else: + if proxyport is None: + proxyport = DEFAULT_PROXYPORT + + if proxypass is None: + proxypass = DEFAULT_PROXY_PASS + + if proxyuser is None: + proxyuser = DEFAULT_PROXY_USER + + assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport ) + assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass ) + assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser ) + + proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport), + 'https': 'http://%s:%i/' % (proxyhost, proxyport) } + + proxy_handler = urllib2.ProxyHandler( proxy_spec ) + proxy_auth_handler = urllib2.HTTPBasicAuthHandler() + proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass ) + + opener = urllib2.build_opener(proxy_handler, proxy_auth_handler) + + return opener.open diff --git a/pyjenkinsci/utils/xmlrunner.py b/pyjenkinsci/utils/xmlrunner.py new file mode 100644 index 0000000..2292719 --- /dev/null +++ b/pyjenkinsci/utils/xmlrunner.py @@ -0,0 +1,244 @@ +""" +XML Test Runner for PyUnit +""" + +# Written by Sebastian Rittau and placed in +# the Public Domain. +from utils import bufwrapper + +__revision__ = "$Id: /mirror/jroger/python/stdlib/xmlrunner.py 3506 2006-07-27T09:12:39.629878Z srittau $" + +import sys +import time +import traceback +import unittest +import logging +from StringIO import StringIO +from xml.sax.saxutils import escape + +log = logging.getLogger() + +class faketest( object ): + """ + A fake test object for when you want to inject additional results into the XML stream. + """ + failureException = AssertionError + + def __init__( self, id, exc_info ): + self._id = id + self._exc_info = exc_info + + def id(self): + return self._id + + def run(self, result): + result.startTest(self) + result.addError(self, self._exc_info ) + ok = False + result.stopTest(self) + + def __call__(self, *args, **kwds): + return self.run(*args, **kwds) + + +class _TestInfo(object): + """Information about a particular test. + Used by _XmlTestResult.""" + + def __init__( self, test, time, ): + (self._class, self._method) = test.id().rsplit(".", 1) + self._time = time + self._error = None + self._failure = None + self._console = "" + + @staticmethod + def create_success(test, time): + """Create a _TestInfo instance for a successful test.""" + return _TestInfo(test, time) + + @staticmethod + def create_failure(test, time, failure, console=""): + """Create a _TestInfo instance for a failed test.""" + info = _TestInfo(test, time) + info._failure = failure + info.console = console + return info + + @staticmethod + def create_error(test, time, error, console="" ): + """Create a _TestInfo instance for an erroneous test.""" + info = _TestInfo(test, time) + info._error = error + info.console = console + return info + + def print_report(self, stream): + """Print information about this test case in XML format to the + supplied stream. + """ + stream.write(' ' % \ + { + "class": self._class, + "method": self._method, + "time": self._time, + }) + if self._failure is not None: + self._print_error(stream, 'failure', self._failure) + if self._error is not None: + self._print_error(stream, 'error', self._error) + stream.write('\n') + + def _print_error(self, stream, tagname, error): + """Print information from a failure or error to the supplied stream.""" + text = escape(str(error[1])) + stream.write('\n') + stream.write(' <%s type="%s">%s\n%s\n' \ + % (tagname, str(error[0]), text, self.console )) + tb_stream = StringIO() + traceback.print_tb(error[2], None, tb_stream) + stream.write(escape(tb_stream.getvalue())) + stream.write(' \n' % tagname) + stream.write(' ') + + +class _XmlTestResult(unittest.TestResult): + """A test result class that stores result as XML. + + Used by XmlTestRunner. + """ + + test_count = 0 + + @classmethod + def get_test_serial( cls ): + cls.test_count += 1 + return cls.test_count + + def __init__(self, classname, consolestream =None ): + unittest.TestResult.__init__(self) + self._test_name = classname + self._start_time = None + self._tests = [] + self._error = None + self._failure = None + self._consolestream = consolestream + + def startTest(self, test): + unittest.TestResult.startTest(self, test) + + sn = self.get_test_serial() + + log.info( "Test %i: %s" % ( sn, test.id() ) ) + self._error = None + self._failure = None + self._start_time = time.time() + + def stopTest(self, test, time_taken = None ): + if time_taken is not None: + time_taken = time.time() - self._start_time + + str_console = self._consolestream.get_and_clear() + + unittest.TestResult.stopTest(self, test) + if self._error: + info = _TestInfo.create_error(test, time_taken, self._error, console=str_console ) + log.error( "Error: %s" % test.id() ) + elif self._failure: + info = _TestInfo.create_failure(test, time_taken, self._failure, console=str_console ) + log.error( "Fail: %s" % test.id() ) + else: + info = _TestInfo.create_success(test, time_taken, ) + log.debug( "OK: %s" % test.id() ) + self._tests.append(info) + + def addError(self, test, err): + log.warn( "Error: %s" % test.id() ) + unittest.TestResult.addError(self, test, err) + self._error = err + + def addFailure(self, test, err): + log.warn( "Failure: %s" % test.id() ) + unittest.TestResult.addFailure(self, test, err) + self._failure = err + + def print_report(self, stream, time_taken, out, err): + """Prints the XML report to the supplied stream. + + The time the tests took to perform as well as the captured standard + output and standard error streams must be passed in. + """ + stream.write('\n' % \ + { + "n": self._test_name, + "t": self.testsRun, + "time": time_taken, + }) + for info in self._tests: + info.print_report(stream) + stream.write(' \n' % out) + stream.write(' \n' % err) + stream.write('\n') + + +class XmlTestRunner(object): + """A test runner that stores results in XML format compatible with JUnit. + + XmlTestRunner(stream=None) -> XML test runner + + The XML file is written to the supplied stream. If stream is None, the + results are stored in a file called TEST-..xml in the + current working directory (if not overridden with the path property), + where and are the module and class name of the test class. + """ + def __init__(self, stream=None ): + self._stream = stream + + @staticmethod + def get_test_class_name_from_testobj( obj_test ): + class_ = obj_test.__class__ + classname = class_.__module__ + "." + class_.__name__ + return classname + + + def run(self, test, result=None ): + """Run the given test case or test suite.""" + classname = self.get_test_class_name_from_testobj( test ) + assert not self._stream is None + stream = self._stream + + # TODO: Python 2.5: Use the with statement + old_stdout = sys.stdout + old_stderr = sys.stderr + sys.stdout = bufwrapper( old_stdout ) + sys.stderr = bufwrapper( old_stderr ) + + if result is None: + result = _XmlTestResult( classname, consolestream = sys.stdout ) + else: + log.debug("Using provided XML test result object.") + + start_time = time.time() + + try: + test(result) + try: + out_s = sys.stdout.getvalue() + except AttributeError: + out_s = "" + try: + err_s = sys.stderr.getvalue() + except AttributeError: + err_s = "" + finally: + sys.stdout = old_stdout + sys.stderr = old_stderr + + time_taken = time.time() - start_time + result.print_report(stream, time_taken, out_s, err_s) + if self._stream is None: + stream.close() + + return result diff --git a/pyjenkinsci/utils/xmlrunnertest.py b/pyjenkinsci/utils/xmlrunnertest.py new file mode 100644 index 0000000..d245284 --- /dev/null +++ b/pyjenkinsci/utils/xmlrunnertest.py @@ -0,0 +1,144 @@ +import unittest +import sys +import re +from cStringIO import StringIO +from utils.xmlrunner import XmlTestRunner + +class XmlTestRunnerTest(unittest.TestCase): + def setUp(self): + self._stream = StringIO() + + def _try_test_run(self, test_class, expected): + """Run the test suite against the supplied test class and compare the + XML result against the expected XML string. Fail if the expected + string doesn't match the actual string. All time attribute in the + expected string should have the value "0.000". All error and failure + messages are reduced to "Foobar". + """ + runner = XmlTestRunner(self._stream) + runner.run(unittest.makeSuite(test_class)) + + got = self._stream.getvalue() + # Replace all time="X.YYY" attributes by time="0.000" to enable a + # simple string comparison. + got = re.sub(r'time="\d+\.\d+"', 'time="0.000"', got) + # Likewise, replace all failure and error messages by a simple "Foobar" + # string. + got = re.sub(r'(?s).*?', r'Foobar', got) + got = re.sub(r'(?s).*?', r'Foobar', got) + + self.assertEqual(expected, got) + + def test_no_tests(self): + """Regression test: Check whether a test run without any tests + matches a previous run.""" + class TestTest(unittest.TestCase): + pass + self._try_test_run(TestTest, """ + + + +""") + + def test_success(self): + """Regression test: Check whether a test run with a successful test + matches a previous run.""" + class TestTest(unittest.TestCase): + def test_foo(self): + pass + self._try_test_run(TestTest, """ + + + + +""") + + def test_failure(self): + """Regression test: Check whether a test run with a failing test + matches a previous run.""" + class TestTest(unittest.TestCase): + def test_foo(self): + self.assert_(False) + self._try_test_run(TestTest, """ + + Foobar + + + + +""") + + def test_error(self): + """Regression test: Check whether a test run with a erroneous test + matches a previous run.""" + class TestTest(unittest.TestCase): + def test_foo(self): + raise IndexError() + self._try_test_run(TestTest, """ + + Foobar + + + + +""") + + def test_stdout_capture(self): + """Regression test: Check whether a test run with output to stdout + matches a previous run.""" + class TestTest(unittest.TestCase): + def test_foo(self): + print "Test" + self._try_test_run(TestTest, """ + + + + +""") + + def test_stderr_capture(self): + """Regression test: Check whether a test run with output to stderr + matches a previous run.""" + class TestTest(unittest.TestCase): + def test_foo(self): + print >>sys.stderr, "Test" + self._try_test_run(TestTest, """ + + + + +""") + + class NullStream(object): + """A file-like object that discards everything written to it.""" + def write(self, buffer): + pass + + def test_unittests_changing_stdout(self): + """Check whether the XmlTestRunner recovers gracefully from unit tests + that change stdout, but don't change it back properly. + """ + class TestTest(unittest.TestCase): + def test_foo(self): + sys.stdout = XmlTestRunnerTest.NullStream() + + runner = XmlTestRunner(self._stream) + runner.run(unittest.makeSuite(TestTest)) + + def test_unittests_changing_stderr(self): + """Check whether the XmlTestRunner recovers gracefully from unit tests + that change stderr, but don't change it back properly. + """ + class TestTest(unittest.TestCase): + def test_foo(self): + sys.stderr = XmlTestRunnerTest.NullStream() + + runner = XmlTestRunner(self._stream) + runner.run(unittest.makeSuite(TestTest)) + + +if __name__ == "__main__": + suite = unittest.makeSuite(XmlTestRunnerTest) + unittest.TextTestRunner().run(suite) diff --git a/pyjenkinsci/view.py b/pyjenkinsci/view.py new file mode 100644 index 0000000..145e71c --- /dev/null +++ b/pyjenkinsci/view.py @@ -0,0 +1,61 @@ +import jenkinsobject +import job + +class view(jenkinsobject): + + def __init__(self, url, name, jenkins_obj): + self.name = name + self.jenkins_obj = jenkins_obj + jenkinsobject.__init__(self, url) + + def __str__(self): + return self.name + + def __getitem__(self, str_job_id ): + assert isinstance( str_job_id, str ) + api_url = self.python_api_url( self.get_job_url( str_job_id ) ) + return job( api_url, str_job_id, self.jenkins_obj ) + + def keys(self): + return self.get_job_dict().keys() + + def iteritems(self): + for name, url in self.get_job_dict().iteritems(): + api_url = self.python_api_url( url ) + yield name, job( api_url, name, self.jenkins_obj ) + + def values(self): + return [ a[1] for a in self.iteritems() ] + + def items(self): + return [ a for a in self.iteritems() ] + + def _get_jobs( self ): + if not self._data.has_key( "jobs" ): + pass + else: + for viewdict in self._data["jobs"]: + yield viewdict["name"], viewdict["url"] + + def get_job_dict(self): + return dict( self._get_jobs() ) + + def __len__(self): + return len( self.get_job_dict().keys() ) + + def get_job_url( self, str_job_name ): + try: + job_dict = self.get_job_dict() + return job_dict[ str_job_name ] + except KeyError, ke: + all_views = ", ".join( job_dict.keys() ) + raise KeyError("Job %s is not known - available: %s" % ( str_job_name, all_views ) ) + + def get_jenkins_obj(self): + return self.jenkins_obj + + def id(self): + """ + Calculate an ID for this object. + """ + return "%s.%s" % ( self.className, self.name ) \ No newline at end of file diff --git a/pyjenkinsci_egg/setup.py b/pyjenkinsci_egg/setup.py deleted file mode 100644 index 03fb136..0000000 --- a/pyjenkinsci_egg/setup.py +++ /dev/null @@ -1,19 +0,0 @@ -from setuptools import setup, find_packages - -GLOBAL_ENTRY_POINTS = { - "console_scripts":[ "jenkins_invoke=pyhudson.command_line.hudson_invoke:main", - "meta_test=pyhudson.command_line.meta_test:main", ] } - -setup(name='pyjenkinsci', - version='0.0.35', - description='A Python API for accessing resources a Hudson or Jenkins continuous-integration server.', - author='Salim Fadhley', - author_email='sal@stodge.org', - #install_requires = [ 'elementtree>=1.2-20040618' ], - #tests = "tests", - package_dir = {'':'src'}, - packages=find_packages('src'), - zip_safe=True, - include_package_data = False, - entry_points = GLOBAL_ENTRY_POINTS, - ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/__init__.py b/pyjenkinsci_egg/src/pyjenkinsci/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyjenkinsci_egg/src/pyjenkinsci/api.py b/pyjenkinsci_egg/src/pyjenkinsci/api.py deleted file mode 100644 index 08d86e7..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/api.py +++ /dev/null @@ -1,137 +0,0 @@ -from pyjenkinsci.jenkins import jenkins -from pyjenkinsci.artifact import artifact -from pyjenkinsci.exceptions import ArtifactsMissing, TimeOut, BadURL -from pyjenkinsci import constants -from urllib2 import urlparse - -import os -import time -import logging - -log = logging.getLogger(__name__) - -def get_latest_test_results( jenkinsurl, jobname ): - """ - A convenience function to fetch down the very latest test results from a jenkins job. - """ - latestbuild = get_latest_build( jenkinsurl, jobname ) - res = latestbuild.get_resultset() - return res - -def get_latest_build( jenkinsurl, jobname ): - """ - A convenience function to fetch down the very latest test results from a jenkins job. - """ - jenkinsci = jenkins( jenkinsurl ) - job = jenkinsci[ jobname ] - return job.get_last_build() - -def get_latest_complete_build( jenkinsurl, jobname ): - """ - A convenience function to fetch down the very latest test results from a jenkins job. - """ - jenkinsci = jenkins( jenkinsurl ) - job = jenkinsci[ jobname ] - return job.get_last_completed_build() - -def get_artifacts( jenkinsurl, jobid=None, build_no=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None ): - """ - Find all the artifacts for the latest build of a job. - """ - jenkinsci = jenkins( jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ) - job = jenkinsci[ jobid ] - if build_no: - build = job.get_build( build_no ) - else: - build = job.get_last_good_build() - artifacts = dict( (artifact.filename, artifact) for artifact in build.get_artifacts() ) - log.info("Found %i artifacts in '%s'" % ( len(artifacts.keys() ), build_no ) ) - return artifacts - -def search_artifacts(jenkinsurl, jobid, artifact_ids=None, same_build=True, build_search_limit=None): - """ - Search the entire history of a jenkins job for a list of artifact names. If same_build - is true then ensure that all artifacts come from the same build of the job - """ - if len( artifact_ids ) == 0 or artifact_ids is None: - return [] - assert same_build, "same_build==False not supported yet" - jenkinsci = jenkins( jenkinsurl ) - job = jenkinsci[ jobid ] - build_ids = job.get_build_ids() - for build_id in build_ids: - build = job.get_build( build_id ) - artifacts = build.get_artifact_dict() - if set( artifact_ids ).issubset( set( artifacts.keys() ) ): - return dict( ( a,artifacts[a] ) for a in artifact_ids ) - missing_artifacts = set( artifact_ids ) - set( artifacts.keys() ) - log.debug("Artifacts %s missing from %s #%i" % ( ", ".join( missing_artifacts ), jobid, build_id )) - raise ArtifactsMissing( missing_artifacts ) - -def grab_artifact( jenkinsurl, jobid, artifactid, targetdir ): - """ - Convenience method to find the latest good version of an artifact and save it - to a target directory. Directory is made automatically if not exists. - """ - artifacts = get_artifacts( jenkinsurl, jobid ) - artifact = artifacts[ artifactid ] - if not os.path.exists( targetdir ): - os.makedirs( targetdir ) - artifact.savetodir( targetdir) - -def block_until_complete( jenkinsurl, jobs, maxwait=12000, interval=30, raise_on_timeout=True ): - """ - Wait until all of the jobs in the list are complete. - """ - assert maxwait > 0 - assert maxwait > interval - assert interval > 0 - - obj_jenkins = jenkins( jenkinsurl ) - obj_jobs = [ obj_jenkins[ jid ] for jid in jobs ] - for time_left in xrange( maxwait, 0, -interval ): - still_running = [ j for j in obj_jobs if j.is_queued_or_running() ] - if not still_running: - return - str_still_running = ", ".join( '"%s"' % str(a) for a in still_running ) - log.warn( "Waiting for jobs %s to complete. Will wait another %is" % ( str_still_running, time_left ) ) - time.sleep( interval ) - if raise_on_timeout: - raise TimeOut( "Waited too long for these jobs to complete: %s" % str_still_running ) - -def get_view_from_url( url ): - """ - Factory method - """ - matched = constants.RE_SPLIT_VIEW_URL.search(url) - if not matched: - raise BadURL("Cannot parse URL %s" % url ) - jenkinsurl, view_name = matched.groups() - jenkinsci = jenkins( jenkinsurl ) - return jenkinsci.get_view( view_name ) - -def install_artifacts( artifacts, dirstruct, installdir, basestaticurl ): - """ - Install the artifacts. - """ - assert basestaticurl.endswith("/"), "Basestaticurl should end with /" - installed = [] - for reldir, artifactnames in dirstruct.items(): - destdir = os.path.join( installdir, reldir ) - if not os.path.exists( destdir ): - log.warn( "Making install directory %s" % destdir ) - os.makedirs( destdir ) - else: - assert os.path.isdir( destdir ) - for artifactname in artifactnames: - destpath = os.path.abspath( os.path.join( destdir, artifactname ) ) - if artifactname in artifacts.keys(): - # The artifact must be loaded from jenkins - theartifact = artifacts[ artifactname ] - else: - # It's probably a static file, we can get it from the static collection - staticurl = urlparse.urljoin( basestaticurl, artifactname ) - theartifact = artifact( artifactname, staticurl ) - theartifact.save( destpath ) - installed.append( destpath ) - return installed diff --git a/pyjenkinsci_egg/src/pyjenkinsci/artifact.py b/pyjenkinsci_egg/src/pyjenkinsci/artifact.py deleted file mode 100644 index 48ff2fb..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/artifact.py +++ /dev/null @@ -1,188 +0,0 @@ -import urllib2 -import os -import logging -import cStringIO -import zipfile -import cPickle -import datetime - -from pyjenkinsci import config -from pyjenkinsci.utils.retry import retry_function -from pyjenkinsci.exceptions import ArtifactBroken -from pyjenkinsci.utils.md5hash import new_digest - -log = logging.getLogger( __name__ ) - -class artifact( object ): - - @staticmethod - def timedelta_to_seconds( td ): - secs = float( td.seconds ) - secs += td.microseconds / 1000000.0 - secs += td.days * 86400 - return secs - - def __init__( self, filename, url, build=None ): - self.filename = filename - self.url = url - self.build = build - - def unpickle(self, method="pickle" ): - """ - Assume that the object is a pickled stream. - """ - stream, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream ) - - while True: - try: - yield cPickle.load( stream ) - except EOFError: - break - - def logging_buffer_copy( self, input_stream, output_stream, length, chunks=10 ): - - chunk_points = int( length / chunks ) - - start_time = datetime.datetime.now() - last_time = datetime.datetime.now() - - for index in xrange( 0, length ): - output_stream.write( input_stream.read(1) ) - - if chunk_points > 0: - if ( index % chunk_points ) == 0 and ( index > 0 ): - now = datetime.datetime.now() - - try: - time_elapsed_since_start = self.timedelta_to_seconds( now - start_time ) - # avg_bitrate = ( index / time_elapsed_since_start ) / 1024.0 - time_elapsed_since_last_chunk = self.timedelta_to_seconds( now - last_time ) - inst_bitrate = ( chunk_points / time_elapsed_since_last_chunk ) / 1024.0 - except ZeroDivisionError, _: - continue - - log.info( "Loaded %i of %i bytes %.2f kbit/s" % ( index, length, inst_bitrate ) ) - last_time = now - - - def getstream( self ): - """ - Get the artifact as a stream - """ - artifact_digest = new_digest() - tmp_buffer = cStringIO.StringIO() - - if self.build: - fn_opener = self.build.job.hudson.get_opener() - else: - fn_opener = urllib2.urlopen - - try: - inputstream = fn_opener( self.url, ) - content_type = inputstream.info().get("content-type", "unknown") - - try: - content_length = int( inputstream.info()["content-length"] ) - self.logging_buffer_copy( inputstream, tmp_buffer, content_length ) - except KeyError, ke: - # Could not get length. - log.warn("Could not get length") - tmp_buffer.write( inputstream.read() ) - - except urllib2.HTTPError: - log.warn( "Error fetching %s" % self.url ) - raise - tmp_buffer.seek(0) - - artifact_digest.update(tmp_buffer.getvalue()) - artifact_hexdigest = artifact_digest.hexdigest() - - artifact_size = len(tmp_buffer.getvalue()) - log.info( "Got %s, %i bytes, MD5: %s, type: %s" % ( self.filename, artifact_size, artifact_hexdigest, content_type ) ) - - if self.build: - self.build.job.hudson.validate_fingerprint( artifact_hexdigest ) - - return tmp_buffer, artifact_hexdigest - - def openzip( self ): - """ - Open the artifact as a zipfile. - """ - buffer, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream ) - zf = zipfile.ZipFile( buffer, "r" ) - return zf - - def save( self, fspath ): - """ - Save the artifact to an explicit path. The containing directory must exist. - Returns a reference to the file which has just been writen to. - """ - - log.info( "Saving artifact @ %s to %s" % (self.url, fspath) ) - - if not fspath.endswith( self.filename ): - log.warn( "Attempt to change the filename of artifact %s on save." % self.filename ) - - if os.path.exists( fspath ): - existing_hexdigest = self.get_local_digest( fspath ) - if self.build: - try: - valid = self.build.job.hudson.validate_fingerprint_for_build( existing_hexdigest, filename=self.filename, job=self.build.job.id(), build=self.build.id() ) - - if valid: - log.info( "Local copy of %s is already up to date. MD5 %s" % (self.filename, existing_hexdigest) ) - else: - self.__do_download( fspath ) - except ArtifactBroken, ab: #@UnusedVariable - log.info("Hudson artifact could not be identified.") - else: - log.info("This file did not originate from Hudson, so cannot check.") - self.__do_download( fspath ) - else: - log.info("Local file is missing, downloading new.") - self.__do_download( fspath ) - - def get_local_digest( self, fspath ): - tmp_buffer_existing = cStringIO.StringIO() - existingfile = open( fspath, "rb" ) - tmp_buffer_existing.write( existingfile.read() ) - existing_digest = new_digest() - existing_digest.update(tmp_buffer_existing.getvalue()) - existing_hexdigest = existing_digest.hexdigest() - return existing_hexdigest - - def __do_download( self, fspath ): - - filedir, _ = os.path.split( fspath ) - if not os.path.exists( filedir ): - log.warn( "Making missing directory %s" % filedir ) - os.makedirs( filedir ) - - try: - outputfile = open( fspath, "wb" ) - except IOError, ioe: - log.critical("User %s@%s cannot open file" % ( os.environ.get("USERNAME","unknown"),os.environ.get("USERDOMAIN","unknown") ) ) - raise - - tmp_buffer_downloaded, artifact_hexdigest = retry_function( config.LOAD_ATTEMPTS , self.getstream ) - - outputfile.write( tmp_buffer_downloaded.getvalue() ) - return outputfile - - - def savetodir( self, dirpath ): - """ - Save the artifact to a folder. The containing directory must be exist, but use the artifact's - default filename. - """ - assert os.path.exists( dirpath ) - assert os.path.isdir( dirpath ) - outputfilepath = os.path.join( dirpath, self.filename ) - self.save( outputfilepath ) - - - def __repr__( self ): - return """<%s.%s %s>""" % ( self.__class__.__module__, - self.__class__.__name__, - self.url ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/build.py b/pyjenkinsci_egg/src/pyjenkinsci/build.py deleted file mode 100644 index 79b9b9b..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/build.py +++ /dev/null @@ -1,107 +0,0 @@ -from pyjenkinsci.jenkinsobject import jenkinsobject -from pyjenkinsci.artifact import artifact -from pyjenkinsci.result_set import result_set -import time -import logging -from pyjenkinsci import config -from pyjenkinsci.exceptions import NoResults, FailedNoResults -from pyjenkinsci.constants import STATUS_FAIL, STATUS_ABORTED, RESULTSTATUS_FAILURE - -log = logging.getLogger(__name__) - -class build( jenkinsobject ): - """ - Represents a jenkins build, executed in context of a job. - """ - - STR_TOTALCOUNT = "totalCount" - STR_TPL_NOTESTS_ERR = "%s has status %s, and does not have any test results" - - def __init__( self, url, buildno, job ): - assert type(buildno) == int - self.buildno = buildno - self.job = job - jenkinsobject.__init__( self, url ) - - def __str__(self): - return self._data['fullDisplayName'] - - def id(self): - return self._data["number"] - - def get_status(self): - return self._data["result"] - - def get_duration(self): - return self._data["duration"] - - def get_artifacts( self ): - for afinfo in self._data["artifacts"]: - url = "%sartifact/%s" % ( self.baseurl, afinfo["relativePath"] ) - af = artifact( afinfo["fileName"], url, self ) - yield af - del af, url - - def get_artifact_dict(self): - return dict( (a.filename, a) for a in self.get_artifacts() ) - - def is_running( self ): - """ - Return a bool if running. - """ - self.poll() - return self._data["building"] - - def is_good( self ): - """ - Return a bool, true if the build was good. - If the build is still running, return False. - """ - return ( not self.is_running() ) and self._data["result"] == 'SUCCESS' - - def block_until_complete(self, delay=15): - assert isinstance( delay, int ) - count = 0 - while self.is_running(): - total_wait = delay * count - log.info("Waited %is for %s #%s to complete" % ( total_wait, self.job.id(), self.id() ) ) - time.sleep( delay ) - count += 1 - - def get_jenkins_obj(self): - return self.job.get_jenkins_obj() - - def get_result_url(self): - """ - Return the URL for the object which provides the job's result summary. - """ - url_tpl = r"%stestReport/%s" - return url_tpl % ( self._data["url"] , config.JENKINS_API ) - - def get_resultset(self): - """ - Obtain detailed results for this build. - """ - result_url = self.get_result_url() - if self.STR_TOTALCOUNT not in self.get_actions(): - raise NoResults( "%s does not have any published results" % str(self) ) - buildstatus = self.get_status() - if buildstatus in [ STATUS_FAIL, RESULTSTATUS_FAILURE, STATUS_ABORTED ]: - raise FailedNoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) ) - if self.get_actions()[ self.STR_TOTALCOUNT ] == 0: - raise NoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) ) - obj_results = result_set( result_url, build=self ) - return obj_results - - def has_resultset(self): - """ - Return a boolean, true if a result set is available. false if not. - """ - return self.STR_TOTALCOUNT in self.get_actions() - - def get_actions(self): - all_actions = {} - for dct_action in self._data["actions"]: - all_actions.update( dct_action ) - return all_actions - diff --git a/pyjenkinsci_egg/src/pyjenkinsci/command_line/__init__.py b/pyjenkinsci_egg/src/pyjenkinsci/command_line/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyjenkinsci_egg/src/pyjenkinsci/command_line/base.py b/pyjenkinsci_egg/src/pyjenkinsci/command_line/base.py deleted file mode 100644 index 9275a43..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/command_line/base.py +++ /dev/null @@ -1,8 +0,0 @@ -from optparse import OptionParser - -class base( object ): - - @classmethod - def mkparser(cls): - parser = OptionParser() - return parser diff --git a/pyjenkinsci_egg/src/pyjenkinsci/command_line/hudson_invoke.py b/pyjenkinsci_egg/src/pyjenkinsci/command_line/hudson_invoke.py deleted file mode 100644 index 78e88c9..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/command_line/hudson_invoke.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -import sys -import logging - -from pyjenkinsci.command_line.base import base -from pyjenkinsci.jenkins import jenkins - -log = logging.getLogger(__name__) - -class jenkins_invoke( base ): - - @classmethod - def mkparser(cls): - parser = base.mkparser( ) - DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" ) - parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete." - parser.add_option("-J", "--jenkinsbase", dest="baseurl", - help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL, - type="str", - default=DEFAULT_BASEURL, ) - parser.add_option("-b", "--block", dest="block", - help="Block until each of the jobs is complete." , - action="store_true", - default=False ) - parser.add_option("-t", "--token", dest="token", - help="Optional security token." , - default=None ) - return parser - - @classmethod - def main(cls): - parser = cls.mkparser() - options, args = parser.parse_args() - try: - assert len( args ) > 0, "Need to specify at least one job name" - except AssertionError, e: - log.critical( e[0] ) - parser.print_help() - sys.exit(1) - invoker = cls( options, args ) - invoker() - - def __init__( self, options, jobs ): - self.options = options - self.jobs = jobs - - def __call__(self): - for job in self.jobs: - self.invokejob( job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token ) - - def invokejob(self, jobname, block, baseurl, token ): - assert type(block) == bool - assert type(baseurl) == str - assert type(jobname) == str - assert token is None or isinstance( token, str ) - jenkinsserver = jenkins( baseurl ) - job = jenkinsserver[ jobname ] - job.invoke( securitytoken=token, block=block ) - - -def main( ): - logging.basicConfig() - logging.getLogger("").setLevel( logging.INFO ) - jenkins_invoke.main() diff --git a/pyjenkinsci_egg/src/pyjenkinsci/command_line/meta_test.py b/pyjenkinsci_egg/src/pyjenkinsci/command_line/meta_test.py deleted file mode 100644 index b1a1bee..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/command_line/meta_test.py +++ /dev/null @@ -1,50 +0,0 @@ -import optparse -import os -import random -import logging -from pyjenkinsci.utils import junitxml -from pyjenkinsci.utils.id import mk_id - -log = logging.getLogger(__name__) - -class meta_test(object): - ATTEMPTS=3 - - @classmethod - def mkParser(cls): - parser = optparse.OptionParser() - - def __init__(self, opts=None): - self.opts = opts - - def testFunction(self): - if random.random() < 0.1: - raise AssertionError("The value was too small") - return 0 - - def __call__(self): - temp_dir = os.environ.get("TEMP", r"c:\temp" ) - output_dir = os.environ.get( "WORKSPACE", temp_dir ) - result_filepath = os.path.join( output_dir, "results.xml" ) - stream = open( result_filepath, "wb" ) - testsuite_name = mk_id() - ju = junitxml.junitxml( stream, testsuite_name) - - - classname = mk_id() - for i in xrange(0, self.ATTEMPTS ): - tr = ju.startTest( classname, mk_id() ) - try: - tr.run( self.testFunction ) - except Exception, e: - log.exception(e) - continue - - ju.write() - -def main( ): - logging.basicConfig() - return meta_test()() - -if __name__ == "__main__": - main() diff --git a/pyjenkinsci_egg/src/pyjenkinsci/config.py b/pyjenkinsci_egg/src/pyjenkinsci/config.py deleted file mode 100644 index aba8f70..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/config.py +++ /dev/null @@ -1,3 +0,0 @@ -JENKINS_API = r"api/python/" -LOAD_TIMEOUT = 30 -LOAD_ATTEMPTS = 5 \ No newline at end of file diff --git a/pyjenkinsci_egg/src/pyjenkinsci/constants.py b/pyjenkinsci_egg/src/pyjenkinsci/constants.py deleted file mode 100644 index d456dff..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/constants.py +++ /dev/null @@ -1,15 +0,0 @@ -import re - -STATUS_FAIL = "FAIL" -STATUS_ERROR = "ERROR" -STATUS_ABORTED = "ABORTED" -STATUS_REGRESSION = "REGRESSION" - -STATUS_FIXED = "FIXED" -STATUS_PASSED = "PASSED" - -RESULTSTATUS_FAILURE = "FAILURE" -RESULTSTATUS_FAILED = "FAILED" - -STR_RE_SPLIT_VIEW = "(.*)/view/([^/]*)/?" -RE_SPLIT_VIEW_URL = re.compile( STR_RE_SPLIT_VIEW ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/exceptions.py b/pyjenkinsci_egg/src/pyjenkinsci/exceptions.py deleted file mode 100644 index 6a2b6cc..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/exceptions.py +++ /dev/null @@ -1,44 +0,0 @@ -class ArtifactsMissing(Exception): - """ - Cannot find a build with all of the required artifacts. - """ - -class UnknownJob( KeyError ): - """ - Hudson does not recognize the job requested. - """ - -class ArtifactBroken(Exception): - """ - An artifact is broken, wrong - """ - -class TimeOut( Exception ): - """ - Some jobs have taken too long to complete. - """ - -class WillNotBuild(Exception): - """ - Cannot trigger a new build. - """ - -class NoBuildData(Exception): - """ - A job has no build data. - """ - -class NoResults(Exception): - """ - A build did not publish any results. - """ - -class FailedNoResults(NoResults): - """ - A build did not publish any results because it failed - """ - -class BadURL(ValueError): - """ - A URL appears to be broken - """ diff --git a/pyjenkinsci_egg/src/pyjenkinsci/fingerprint.py b/pyjenkinsci_egg/src/pyjenkinsci/fingerprint.py deleted file mode 100644 index 073ba50..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/fingerprint.py +++ /dev/null @@ -1,82 +0,0 @@ -from pyjenkinsci.jenkinsobject import jenkinsobject -from pyjenkinsci.exceptions import ArtifactBroken - -import urllib2 -import re - -import logging - -log = logging.getLogger( __name__ ) - -class fingerprint( jenkinsobject ): - """ - Represents a jenkins fingerprint on a single artifact file ?? - """ - RE_MD5 = re.compile("^([0-9a-z]{32})$") - - def __init__( self, baseurl, id, jenkins_obj ): - logging.basicConfig() - self.jenkins_obj = jenkins_obj - assert self.RE_MD5.search( id ), "%s does not look like a valid id" % id - url = "%s/fingerprint/%s/" % ( baseurl, id ) - jenkinsobject.__init__( self, url, poll=False ) - self.id = id - - def get_jenkins_obj(self): - return self.jenkins_obj - - def __str__(self): - return self.id - - def valid(self): - """ - Return True / False if valid - """ - try: - self.poll() - except urllib2.HTTPError, e: - return False - return True - - def validate_for_build(self, filename, job, build): - if not self.valid(): - log.info("Unknown to jenkins.") - return False - if not self._data["original"] is None: - if self._data["original"]["name"] == job: - if self._data["original"]["number"] == build: - return True - if self._data["fileName"] != filename: - log.info("Filename from jenkins (%s) did not match provided (%s)" % ( self._data["fileName"], filename ) ) - return False - for usage_item in self._data["usage"]: - if usage_item["name"] == job: - for range in usage_item["ranges"]["ranges"]: - if range["start"] <= build <= range["end"]: - log.info("This artifact was generated by %s between build %i and %i" % ( job, range["start"], range["end"] ) ) - return True - return False - - def validate(self): - try: - assert self.valid() - except AssertionError, ae: - raise ArtifactBroken( "Artifact %s seems to be broken, check %s" % ( self.id, self.baseurl ) ) - except urllib2.HTTPError, httpe: - raise ArtifactBroken( "Unable to validate artifact id %s using %s" % ( self.id, self.baseurl ) ) - return True - - def get_info( self ): - """ - Returns a tuple of build-name, build# and artifiact filename for a good build. - """ - self.poll() - return self._data["original"]["name"], self._data["original"]["number"], self._data["fileName"] - - -if __name__ == "__main__": - ff = fingerprint( "http://localhost:8080/hudson/", "0f37cbb6545b8778bc0700d90be66bf3" ) - print repr(ff) - print ff.baseurl - print ff.valid() - print ff.get_info( ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/jenkins.py b/pyjenkinsci_egg/src/pyjenkinsci/jenkins.py deleted file mode 100644 index 6a821e9..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/jenkins.py +++ /dev/null @@ -1,96 +0,0 @@ -from pyjenkinsci.exceptions import UnknownJob -from pyjenkinsci.fingerprint import fingerprint -from pyjenkinsci.jenkinsobject import jenkinsobject -from pyjenkinsci.job import job -from pyjenkinsci.utils.urlopener import mkurlopener -from pyjenkinsci.view import view -import logging -import time - -log = logging.getLogger(__name__) - -class jenkins( jenkinsobject ): - """ - Represents a jenkins environment. - """ - def __init__(self, baseurl, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None): - self.proxyhost = proxyhost - self.proxyport = proxyport - self.proxyuser = proxyuser - self.proxypass = proxypass - jenkinsobject.__init__( self, baseurl ) - - def get_proxy_auth(self): - return (self.proxyhost, self.proxyport, self.proxyuser, self.proxypass) - - def get_opener( self ): - return mkurlopener(*self.get_proxy_auth()) - - def validate_fingerprint( self, id ): - obj_fingerprint = fingerprint(self.baseurl, id, jenkins_obj=self) - obj_fingerprint.validate() - log.info("Jenkins says %s is valid" % id) - - def get_artifact_data(self, id): - obj_fingerprint = fingerprint(self.baseurl, id, jenkins_obj=self) - obj_fingerprint.validate() - return obj_fingerprint.get_info() - - def validate_fingerprint_for_build(self, digest, filename, job, build ): - obj_fingerprint = fingerprint( self.baseurl, digest, jenkins_obj=self ) - return obj_fingerprint.validate_for_build( filename, job, build ) - - def get_jenkins_obj(self): - return self - - def get_jobs(self): - """ - Fetch all the build-names on this Hudson server. - """ - for info in self._data["jobs"]: - yield info["name"], job( info["url"], info["name"], jenkins_obj=self) - - def iteritems(self): - return self.get_jobs() - - def iterkeys(self): - for info in self._data["jobs"]: - yield info["name"] - - def keys(self): - return [ a for a in self.iterkeys() ] - - def __str__(self): - return "Jenkins server at %s" % self.baseurl - - def _get_views( self ): - if not self._data.has_key( "views" ): - pass - else: - for viewdict in self._data["views"]: - yield viewdict["name"], viewdict["url"] - - def get_view_dict(self): - return dict( self._get_views() ) - - def get_view_url( self, str_view_name ): - try: - view_dict = self.get_view_dict() - return view_dict[ str_view_name ] - except KeyError, ke: - all_views = ", ".join( view_dict.keys() ) - raise KeyError("View %s is not known - available: %s" % ( str_view_name, all_views ) ) - - def get_view(self, str_view_name ): - view_url = self.get_view_url( str_view_name ) - view_api_url = self.python_api_url( view_url ) - return view(view_api_url , str_view_name, jenkins_obj=self) - - def __getitem__( self, buildname ): - """ - Get a build - """ - for name, job in self.get_jobs(): - if name == buildname: - return job - raise UnknownJob(buildname) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/jenkinsobject.py b/pyjenkinsci_egg/src/pyjenkinsci/jenkinsobject.py deleted file mode 100644 index 031ab03..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/jenkinsobject.py +++ /dev/null @@ -1,68 +0,0 @@ -import urllib2 -import logging -import pprint -from pyjenkinsci import config -from pyjenkinsci.utils.retry import retry_function - -log = logging.getLogger( __name__ ) - -class jenkinsobject( object ): - """ - This appears to be the base object that all other jenkins objects are inherited from - """ - RETRY_ATTEMPTS = 5 - - def __repr__( self ): - return """<%s.%s %s>""" % ( self.__class__.__module__, - self.__class__.__name__, - str( self ) ) - - def print_data(self): - pprint.pprint( self._data ) - - def __str__(self): - raise NotImplemented - - def __init__( self, baseurl, poll=True ): - """ - Initialize a jenkins connection - """ - self.baseurl = baseurl - if poll: - try: - self.poll() - except urllib2.HTTPError, hte: - log.exception(hte) - log.warn( "Failed to conenct to %s" % baseurl ) - raise - - def poll(self): - self._data = self._poll() - - def _poll(self): - url = self.python_api_url( self.baseurl ) - return retry_function( self.RETRY_ATTEMPTS , self.get_data, url ) - - @classmethod - def python_api_url( cls, url ): - if url.endswith( config.JENKINS_API ): - return url - else: - if url.endswith( r"/" ): - fmt="%s%s" - else: - fmt = "%s/%s" - return fmt % (url, config.JENKINS_API) - - def get_data( self, url ): - """ - Find out how to connect, and then grab the data. - """ - fn_urlopen = self.getHudsonObject().get_opener() - try: - stream = fn_urlopen( url ) - result = eval( stream.read() ) - except urllib2.HTTPError, e: - log.warn( "Error reading %s" % url ) - raise - return result diff --git a/pyjenkinsci_egg/src/pyjenkinsci/job.py b/pyjenkinsci_egg/src/pyjenkinsci/job.py deleted file mode 100644 index 64ea3e2..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/job.py +++ /dev/null @@ -1,177 +0,0 @@ -import logging -import urlparse -import urllib2 -import time - -from pyjenkinsci.jenkinsobject import jenkinsobject -from pyjenkinsci.build import build -from pyjenkinsci.exceptions import NoBuildData - -log = logging.getLogger(__name__) - -class job( jenkinsobject ): - """ - Represents a jenkins job - A job can hold N builds which are the actual execution environments - """ - def __init__( self, url, name, jenkins_obj ): - self.name = name - self.jenkins = jenkins_obj - jenkinsobject.__init__( self, url ) - - def id( self ): - return self._data["name"] - - def __str__(self): - return self._data["name"] - - def get_jenkins_obj(self): - return self.jenkins - - def get_build_triggerurl( self, token=None ): - if token is None: - extra = "build" - else: - assert isinstance(token, str ), "token if provided should be a string." - extra = "build?token=%s" % token - buildurl = urlparse.urljoin( self.baseurl, extra ) - return buildurl - - def hit_url(self, url ): - fn_urlopen = self.get_jenkins_obj().get_opener() - try: - stream = fn_urlopen( url ) - html_result = stream.read() - except urllib2.HTTPError, e: - log.debug( "Error reading %s" % url ) - raise - return html_result - - def invoke( self, securitytoken=None, block=False, skip_if_running=False, invoke_pre_check_delay=3, invoke_block_delay=15 ): - assert isinstance( invoke_pre_check_delay, (int, float) ) - assert isinstance( invoke_block_delay, (int, float) ) - assert isinstance( block, bool ) - assert isinstance( skip_if_running, bool ) - skip_build = False - if self.is_queued(): - log.warn( "Will not request new build because %s is already queued" % self.id() ) - skip_build = True - elif self.is_running(): - if skip_if_running: - log.warn( "Will not request new build because %s is already running" % self.id() ) - skip_build = True - else: - log.warn("Will re-schedule %s even though it is already running" % self.id() ) - original_build_no = self.get_last_buildnumber() - if skip_build: - pass - else: - log.info( "Attempting to start %s on %s" % ( self.id(), repr(self.get_jenkins_obj()) ) ) - url = self.get_build_triggerurl( securitytoken ) - html_result = self.hit_url( url ) - assert len( html_result ) > 0 - if invoke_pre_check_delay > 0: - log.info("Waiting for %is to allow Hudson to catch up" % invoke_pre_check_delay ) - time.sleep( invoke_pre_check_delay ) - if block: - total_wait = 0 - while self.is_queued(): - log.info( "Waited %is for %s to begin..." % ( total_wait, self.id() ) ) - time.sleep( invoke_block_delay ) - total_wait += invoke_block_delay - if self.is_running(): - running_build = self.get_last_build() - running_build.block_until_complete( delay=invoke_pre_check_delay ) - assert running_build.is_good() - else: - assert self.get_last_buildnumber() > original_build_no, "Job does not appear to have run." - else: - if self.is_queued(): - log.info( "%s has been queued." % self.id() ) - elif self.is_running(): - log.info( "%s is running." % self.id() ) - elif original_build_no < self.get_last_buildnumber(): - log.info( "%s has completed." % self.id() ) - else: - raise AssertionError("The job did not schedule.") - - def _buildid_for_type(self, buildtype): - """Gets a buildid for a given type of build""" - KNOWNBUILDTYPES=["lastSuccessfulBuild", "lastBuild", "lastCompletedBuild"] - assert buildtype in KNOWNBUILDTYPES - buildid = self._data[buildtype]["number"] - assert type(buildid) == int, "Build ID should be an integer, got %s" % repr( buildid ) - return buildid - - def get_last_good_buildnumber( self ): - """ - Get the numerical ID of the last good build. - """ - return self._buildid_for_type(buildtype="lastSuccessfulBuild") - - def get_last_buildnumber( self ): - """ - Get the numerical ID of the last build. - """ - return self._buildid_for_type(buildtype="lastBuild") - - def get_last_completed_buildnumber( self ): - """ - Get the numerical ID of the last complete build. - """ - return self._buildid_for_type(buildtype="lastCompletedBuild") - - def get_build_dict(self): - if not self._data.has_key( "builds" ): - raise NoBuildData( repr(self) ) - return dict( ( a["number"], a["url"] ) for a in self._data["builds"] ) - - def get_build_ids(self): - """ - Return a sorted list of all good builds as ints. - """ - return reversed( sorted( self.get_build_dict().keys() ) ) - - def get_last_good_build( self ): - """ - Get the last good build - """ - bn = self.get_last_good_buildnumber() - return self.get_build( bn ) - - def get_last_build( self ): - """ - Get the last good build - """ - bn = self.get_last_buildnumber() - return self.get_build( bn ) - - def get_last_completed_build( self ): - """ - Get the last build regardless of status - """ - bn = self.get_last_completed_buildnumber() - return self.get_build( bn ) - - def get_build( self, buildnumber ): - assert type(buildnumber) == int - url = self.get_build_dict()[ buildnumber ] - return build( url, buildnumber, job=self ) - - def __getitem__( self, buildnumber ): - return self.get_build(buildnumber) - - def is_queued_or_running(self): - return self.is_queued() or self.is_running() - - def is_queued(self): - self.poll() - return self._data["inQueue"] - - def is_running(self): - self.poll() - try: - return self.get_last_build().is_running() - except NoBuildData: - log.info("No build info available for %s, assuming not running." % str(self) ) - return False diff --git a/pyjenkinsci_egg/src/pyjenkinsci/result.py b/pyjenkinsci_egg/src/pyjenkinsci/result.py deleted file mode 100644 index 64bd90a..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/result.py +++ /dev/null @@ -1,21 +0,0 @@ -class result( object ): - def __init__(self, **kwargs ): - """ - - """ - self.__dict__.update( kwargs ) - - def __str__(self): - return "%s %s %s" % ( self.className, self.name, self.status ) - - def __repr__(self): - module_name = self.__class__.__module__ - class_name = self.__class__.__name__ - self_str = str( self ) - return "<%s.%s %s>" % ( module_name , class_name , self_str ) - - def id(self): - """ - Calculate an ID for this object. - """ - return "%s.%s" % ( self.className, self.name ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/result_set.py b/pyjenkinsci_egg/src/pyjenkinsci/result_set.py deleted file mode 100644 index 7eb7579..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/result_set.py +++ /dev/null @@ -1,39 +0,0 @@ -from pyjenkinsci.jenkinsobject import jenkinsobject -from pyjenkinsci.result import result - -class result_set( jenkinsobject ): - """ - Represents a result from a completed Hudson run. - """ - def getHudsonObject(self): - return self.build.job.get_jenkins_obj() - - def __init__(self, url, build ): - """ - """ - self.build = build - jenkinsobject.__init__( self, url ) - - def __str__(self): - return "Test Result for %s" % str( self.build ) - - def keys(self): - return [ a[0] for a in self.iteritems() ] - - def items(self): - return [a for a in self.iteritems()] - - def iteritems(self): - for suite in self._data.get("suites", [] ): - for case in suite["cases"]: - R = result( **case ) - yield R.id(), R - - for report_set in self._data.get( "childReports", [] ): - for suite in report_set["result"]["suites"]: - for case in suite["cases"]: - R = result( **case ) - yield R.id(), R - - def __len__(self): - return sum( 1 for x in self.iteritems() ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/__init__.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/bufwrapper.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/bufwrapper.py deleted file mode 100644 index 8addaaa..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/bufwrapper.py +++ /dev/null @@ -1,39 +0,0 @@ -from cStringIO import StringIO - -class bufwrapper( object ): - """ - Basic buffer-wrapper - wraps up an output stream with a buffer. - """ - def __init__( self, stream, buffer=None ): - self.stream = stream - - assert hasattr( self.stream, "write" ), "%s does not support write" % repr(stream) - - if buffer is None: - self.buf = StringIO() - else: - self.buf = buffer - - def get_and_clear( self ): - """ - Get the contents of the buffer and clear it. - """ - old_buffer = self.buf - self.buf = StringIO() - return old_buffer.getvalue() - - def flush( self ): - for item in [ self.stream, self.buf ]: - if hasattr( item, "flush" ) and callable( item.flush ): - item.flush() - - - def close(self): - self.stream.close() - - def write(self, txt ): - self.stream.write(txt) - self.buf.write(txt) - - def getvalue(self): - return self.buf.getvalue() diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/dates.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/dates.py deleted file mode 100644 index 121257e..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/dates.py +++ /dev/null @@ -1,14 +0,0 @@ -import datetime - -MICROSECONDS_PER_SECOND = 1000000.0 -SECONDS_PER_DAY = 86400 - -def timedelta_to_seconds( td ): - assert isinstance( td, datetime.timedelta ) - seconds = 0.0 - - seconds += td.days * SECONDS_PER_DAY - seconds += td.seconds - seconds += td.microseconds / MICROSECONDS_PER_SECOND - - return seconds diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/id.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/id.py deleted file mode 100644 index 097debb..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/id.py +++ /dev/null @@ -1,16 +0,0 @@ -""" -Generate random IDs. -""" -import random - -ID_VALID = "abcdefghijklmnopqrstuvwxyz0123456789" - -def mk_id(length=5, prefix=""): - idchars = [] - for count in range( 0, length ): - idchars.append( random.choice( ID_VALID ) ) - return "%s%s" % ( prefix, "".join( idchars ) ) - -if __name__ == "__main__": - for i in range(0, 50): - print repr( mk_id( i ) ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/junitxml.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/junitxml.py deleted file mode 100644 index 34f4f7c..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/junitxml.py +++ /dev/null @@ -1,189 +0,0 @@ -import logging -import datetime -import traceback -import sys - -try: - from xml.etree import ElementTree as ET -except Exception, e: - import elementtree.ElementTree as ET - -from pyjenkinsci.utils.dates import timedelta_to_seconds - -log = logging.getLogger(__name__) - -class junitxml( object ): - - ERROR = "error" - FAILURE = "failure" - - def __init__( self, stream, testsuite_name="test", ): - """ - Set up a new stream - """ - assert isinstance( testsuite_name, str ) - - self.xml = ET.Element("testsuite") - self.stream = stream - - self.xml.attrib["name"] = testsuite_name - - self.count_errors = 0 - self.count_tests = 0 - self.count_failures = 0 - - def __repr__(self): - return "<%s.%s %s>" % (self.__class__.__module__, self.__class__.__name__, str(self)) - - def __str__(self): - return "Stream: %s, Tests: %i Errors: %i, Failures %i" % ( repr( self.stream ), - self.count_tests, - self.count_errors, - self.count_failures ) - - @classmethod - def get_error_strings( cls, e ): - str_error_type = "%s.%s" % ( e.__class__.__module__, e.__class__.__name__ ) - str_error_args = ",".join( [repr(ee) for ee in e.args] ) - str_doc = str( e.__doc__ ).strip() - - return str_error_type, str_error_args, str_doc - - def write(self, xml_declaration=True, encoding="utf-8"): - self.xml.attrib["errors"] = str( self.count_errors ) - self.xml.attrib["failures"] = str( self.count_failures ) - self.xml.attrib["tests"] = str( self.count_tests ) - - ET.ElementTree( self.xml ).write( self.stream, encoding=encoding, xml_declaration=xml_declaration ) - log.warn( "Wrote Junit-style XML log to %s" % self.stream ) - - def assertTrue(self, classname, testname, errmsg, fn, *args, **kwargs ): - """ - Map the interface onto an assert like statement. - Also returns the value so that we can do useful things with the result - """ - - _testname = testname.replace( ".", "_") # Dots are not permitted in names' - - def assert_fn( ): - if callable(fn): - assert fn( *args, **kwargs ), errmsg - else: - assert len(args) == 0 and len(kwargs) == 0, "Object being tested is not callable and cannot have arguments." - assert fn, "errmsg" - - tr = self.startTest(classname, _testname) - return tr.run( assert_fn ) - - def startTest( self, classname, testname, ): - return junitxml_transaction( self, classname, testname ) - - def passTest( self, classname, name, test_time ): - self.addPass( classname, name, test_time) - - def failTest(self, classname, name, test_time, error, tb, mode=FAILURE ): - """ - Add a error - """ - str_error, str_error_msg, str_doc = self.get_error_strings( error ) - enhanced_tb = "%s: %s\n\n( %s )\n\n%s" % ( repr(error), str_error_msg, str_doc, tb ) - tc = self.addPass( classname, name, test_time) - self.convertPassToFail( tc, str_error, enhanced_tb, mode=mode ) - - - def addPass(self, classname, name, test_time=0.0, ): - """ - Add a pass - """ - assert isinstance( classname, str ) - assert isinstance( name, str ) - assert isinstance( test_time, (int, float) ) - self.count_tests += 1 - testcase = ET.SubElement( self.xml, "testcase" ) - testcase.attrib["classname"] = classname - testcase.attrib["name"] = name - testcase.attrib["time"] = "%.2f" % test_time - - return testcase - - def convertPassToFail( self, tc, failtype="", tb="", mode=FAILURE ): - """ - Add a failure - """ - assert isinstance( failtype, str ) - assert isinstance( tb, str ), "Traceback should be a string, got %s" % repr(tb) - assert mode in [ self.FAILURE, self.ERROR ] - - if mode == self.FAILURE: - self.count_errors += 1 - else: - self.count_failures += 1 - - failure = ET.SubElement( tc, mode ) - failure.text = tb - failure.attrib["type"] = failtype - return failure - - -class junitxml_transaction( object ): - def __init__(self, jxml, classname, testname ): - assert isinstance( jxml, junitxml ) - self.jxml = jxml - self.classname = classname - self.testname = testname - self.start_time = datetime.datetime.now() - - def getRuntime(self): - return timedelta_to_seconds( datetime.datetime.now() - self.start_time ) - - def run( self, fn, *args, **kwargs ): - try: - result = fn( *args, **kwargs ) - self.jxml.addPass( self.classname, self.testname, self.getRuntime() ) - except Exception, e: - ex_type, ex_value, ex_tb = sys.exc_info() - - tb_formatted = traceback.format_exception( ex_type, ex_value, ex_tb ) - str_tb = "\n".join( tb_formatted ) - str_ex = "%s.%s" % ( ex_value.__class__.__module__, ex_value.__class__.__name__ ) - runtime = self.getRuntime() - - if isinstance(e, AssertionError): - self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.FAILURE ) - else: - self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.ERROR ) - - log.exception(e) - - raise e - return result - -if __name__ == "__main__": - import sys - import time - import random - - logging.basicConfig() - logging.getLogger("").setLevel( logging.INFO ) - fod = junitxml( stream=sys.stdout ) - - def fn_test( mode ): - - time.sleep( random.random( ) ) - - if mode=="pass": - return 1 - elif mode=="fail": - assert False - elif mode=="error": - {}["x"] - - for testname in [ "pass", "fail", "error" ]: - t = fod.startTest("a", testname, ) - try: - t.run( fn_test, testname ) - except Exception, e: - #log.exception(e) - pass - - fod.write() diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/md5hash.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/md5hash.py deleted file mode 100644 index 4fd85e8..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/md5hash.py +++ /dev/null @@ -1,17 +0,0 @@ -try: - import hashlib -except ImportError: - import md5 - - -def new_digest(): - if hashlib: - m = hashlib.md5() - else: - m = md5.new() - return m - -if __name__ == "__main__": - x = new_digest() - x.update("123") - print repr( x.digest() ) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/retry.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/retry.py deleted file mode 100644 index 2b775ad..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/retry.py +++ /dev/null @@ -1,51 +0,0 @@ -import logging -import time - -log = logging.getLogger( __name__ ) - -IGNORE_EXCEPTIONS = [ AttributeError, KeyboardInterrupt ] - -DEFAULT_SLEEP_TIME = 1 - -def retry_function( tries, fn, *args, **kwargs ): - """ - Retry function - calls an unreliable function n times before giving up, if tries is exceeded - and it still fails the most recent exception is raised. - """ - assert isinstance( tries, int ), "Tries should be a non-zero positive integer" - assert tries > 0, "Tries should be a non-zero positive integer" - for attempt in range(0, tries): - attemptno = attempt + 1 - if attemptno == tries: - log.warn( "Last chance: #%i of %i" % ( attemptno, tries ) ) - elif tries > attempt > 0: - log.warn( "Attempt #%i of %i" % ( attemptno, tries ) ) - try: - result = fn( *args, **kwargs ) - if attempt > 0: - log.info( "Result obtained after attempt %i" % attemptno ) - return result - except Exception, e: - if type(e) in IGNORE_EXCEPTIONS: - # Immediatly raise in some cases. - raise - try: - fn_name = fn.__name__ - except AttributeError, ae: - fn_name = "Anonymous Function" - log.exception(e) - log.warn( "%s failed at attempt %i, trying again." % ( fn_name , attemptno ) ) - time.sleep( DEFAULT_SLEEP_TIME ) - raise e - -if __name__ == "__main__": - - def broken_function( a ): - return {}[a] - - logging.basicConfig() - - try: - retry_function( 3, broken_function, "x" ) - except Exception, e: - print repr(e) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/urlopener.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/urlopener.py deleted file mode 100644 index b3739df..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/urlopener.py +++ /dev/null @@ -1,37 +0,0 @@ -import urllib2 - -import logging - -log = logging.getLogger( __name__ ) - -DEFAULT_PROXYPORT = 80 -DEFAULT_PROXY_PASS = "Research123" -DEFAULT_PROXY_USER = "wsa_oblicqs_dev" - -def mkurlopener( proxyhost, proxyport, proxyuser, proxypass ): - if not proxyhost: - return urllib2.urlopen - else: - if proxyport is None: - proxyport = DEFAULT_PROXYPORT - - if proxypass is None: - proxypass = DEFAULT_PROXY_PASS - - if proxyuser is None: - proxyuser = DEFAULT_PROXY_USER - - assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport ) - assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass ) - assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser ) - - proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport), - 'https': 'http://%s:%i/' % (proxyhost, proxyport) } - - proxy_handler = urllib2.ProxyHandler( proxy_spec ) - proxy_auth_handler = urllib2.HTTPBasicAuthHandler() - proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass ) - - opener = urllib2.build_opener(proxy_handler, proxy_auth_handler) - - return opener.open diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/xmlrunner.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/xmlrunner.py deleted file mode 100644 index 58445a8..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/xmlrunner.py +++ /dev/null @@ -1,245 +0,0 @@ -""" -XML Test Runner for PyUnit -""" - -# Written by Sebastian Rittau and placed in -# the Public Domain. - -__revision__ = "$Id: /mirror/jroger/python/stdlib/xmlrunner.py 3506 2006-07-27T09:12:39.629878Z srittau $" - -import sys -import time -import traceback -import unittest -import logging -from StringIO import StringIO -from xml.sax.saxutils import escape - -log = logging.getLogger() - -from pyjenkinsci.utils.bufwrapper import bufwrapper - -class faketest( object ): - """ - A fake test object for when you want to inject additional results into the XML stream. - """ - failureException = AssertionError - - def __init__( self, id, exc_info ): - self._id = id - self._exc_info = exc_info - - def id(self): - return self._id - - def run(self, result): - result.startTest(self) - result.addError(self, self._exc_info ) - ok = False - result.stopTest(self) - - def __call__(self, *args, **kwds): - return self.run(*args, **kwds) - - -class _TestInfo(object): - """Information about a particular test. - Used by _XmlTestResult.""" - - def __init__( self, test, time, ): - (self._class, self._method) = test.id().rsplit(".", 1) - self._time = time - self._error = None - self._failure = None - self._console = "" - - @staticmethod - def create_success(test, time): - """Create a _TestInfo instance for a successful test.""" - return _TestInfo(test, time) - - @staticmethod - def create_failure(test, time, failure, console=""): - """Create a _TestInfo instance for a failed test.""" - info = _TestInfo(test, time) - info._failure = failure - info.console = console - return info - - @staticmethod - def create_error(test, time, error, console="" ): - """Create a _TestInfo instance for an erroneous test.""" - info = _TestInfo(test, time) - info._error = error - info.console = console - return info - - def print_report(self, stream): - """Print information about this test case in XML format to the - supplied stream. - """ - stream.write(' ' % \ - { - "class": self._class, - "method": self._method, - "time": self._time, - }) - if self._failure is not None: - self._print_error(stream, 'failure', self._failure) - if self._error is not None: - self._print_error(stream, 'error', self._error) - stream.write('\n') - - def _print_error(self, stream, tagname, error): - """Print information from a failure or error to the supplied stream.""" - text = escape(str(error[1])) - stream.write('\n') - stream.write(' <%s type="%s">%s\n%s\n' \ - % (tagname, str(error[0]), text, self.console )) - tb_stream = StringIO() - traceback.print_tb(error[2], None, tb_stream) - stream.write(escape(tb_stream.getvalue())) - stream.write(' \n' % tagname) - stream.write(' ') - - -class _XmlTestResult(unittest.TestResult): - """A test result class that stores result as XML. - - Used by XmlTestRunner. - """ - - test_count = 0 - - @classmethod - def get_test_serial( cls ): - cls.test_count += 1 - return cls.test_count - - def __init__(self, classname, consolestream =None ): - unittest.TestResult.__init__(self) - self._test_name = classname - self._start_time = None - self._tests = [] - self._error = None - self._failure = None - self._consolestream = consolestream - - def startTest(self, test): - unittest.TestResult.startTest(self, test) - - sn = self.get_test_serial() - - log.info( "Test %i: %s" % ( sn, test.id() ) ) - self._error = None - self._failure = None - self._start_time = time.time() - - def stopTest(self, test, time_taken = None ): - if time_taken is not None: - time_taken = time.time() - self._start_time - - str_console = self._consolestream.get_and_clear() - - unittest.TestResult.stopTest(self, test) - if self._error: - info = _TestInfo.create_error(test, time_taken, self._error, console=str_console ) - log.error( "Error: %s" % test.id() ) - elif self._failure: - info = _TestInfo.create_failure(test, time_taken, self._failure, console=str_console ) - log.error( "Fail: %s" % test.id() ) - else: - info = _TestInfo.create_success(test, time_taken, ) - log.debug( "OK: %s" % test.id() ) - self._tests.append(info) - - def addError(self, test, err): - log.warn( "Error: %s" % test.id() ) - unittest.TestResult.addError(self, test, err) - self._error = err - - def addFailure(self, test, err): - log.warn( "Failure: %s" % test.id() ) - unittest.TestResult.addFailure(self, test, err) - self._failure = err - - def print_report(self, stream, time_taken, out, err): - """Prints the XML report to the supplied stream. - - The time the tests took to perform as well as the captured standard - output and standard error streams must be passed in. - """ - stream.write('\n' % \ - { - "n": self._test_name, - "t": self.testsRun, - "time": time_taken, - }) - for info in self._tests: - info.print_report(stream) - stream.write(' \n' % out) - stream.write(' \n' % err) - stream.write('\n') - - -class XmlTestRunner(object): - """A test runner that stores results in XML format compatible with JUnit. - - XmlTestRunner(stream=None) -> XML test runner - - The XML file is written to the supplied stream. If stream is None, the - results are stored in a file called TEST-..xml in the - current working directory (if not overridden with the path property), - where and are the module and class name of the test class. - """ - def __init__(self, stream=None ): - self._stream = stream - - @staticmethod - def get_test_class_name_from_testobj( obj_test ): - class_ = obj_test.__class__ - classname = class_.__module__ + "." + class_.__name__ - return classname - - - def run(self, test, result=None ): - """Run the given test case or test suite.""" - classname = self.get_test_class_name_from_testobj( test ) - assert not self._stream is None - stream = self._stream - - # TODO: Python 2.5: Use the with statement - old_stdout = sys.stdout - old_stderr = sys.stderr - sys.stdout = bufwrapper( old_stdout ) - sys.stderr = bufwrapper( old_stderr ) - - if result is None: - result = _XmlTestResult( classname, consolestream = sys.stdout ) - else: - log.debug("Using provided XML test result object.") - - start_time = time.time() - - try: - test(result) - try: - out_s = sys.stdout.getvalue() - except AttributeError: - out_s = "" - try: - err_s = sys.stderr.getvalue() - except AttributeError: - err_s = "" - finally: - sys.stdout = old_stdout - sys.stderr = old_stderr - - time_taken = time.time() - start_time - result.print_report(stream, time_taken, out_s, err_s) - if self._stream is None: - stream.close() - - return result diff --git a/pyjenkinsci_egg/src/pyjenkinsci/utils/xmlrunnertest.py b/pyjenkinsci_egg/src/pyjenkinsci/utils/xmlrunnertest.py deleted file mode 100644 index 2ad90da..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/utils/xmlrunnertest.py +++ /dev/null @@ -1,144 +0,0 @@ -import unittest -import sys -import re -from cStringIO import StringIO -from pyjenkinsci.utils.xmlrunner import XmlTestRunner - -class XmlTestRunnerTest(unittest.TestCase): - def setUp(self): - self._stream = StringIO() - - def _try_test_run(self, test_class, expected): - """Run the test suite against the supplied test class and compare the - XML result against the expected XML string. Fail if the expected - string doesn't match the actual string. All time attribute in the - expected string should have the value "0.000". All error and failure - messages are reduced to "Foobar". - """ - runner = XmlTestRunner(self._stream) - runner.run(unittest.makeSuite(test_class)) - - got = self._stream.getvalue() - # Replace all time="X.YYY" attributes by time="0.000" to enable a - # simple string comparison. - got = re.sub(r'time="\d+\.\d+"', 'time="0.000"', got) - # Likewise, replace all failure and error messages by a simple "Foobar" - # string. - got = re.sub(r'(?s).*?', r'Foobar', got) - got = re.sub(r'(?s).*?', r'Foobar', got) - - self.assertEqual(expected, got) - - def test_no_tests(self): - """Regression test: Check whether a test run without any tests - matches a previous run.""" - class TestTest(unittest.TestCase): - pass - self._try_test_run(TestTest, """ - - - -""") - - def test_success(self): - """Regression test: Check whether a test run with a successful test - matches a previous run.""" - class TestTest(unittest.TestCase): - def test_foo(self): - pass - self._try_test_run(TestTest, """ - - - - -""") - - def test_failure(self): - """Regression test: Check whether a test run with a failing test - matches a previous run.""" - class TestTest(unittest.TestCase): - def test_foo(self): - self.assert_(False) - self._try_test_run(TestTest, """ - - Foobar - - - - -""") - - def test_error(self): - """Regression test: Check whether a test run with a erroneous test - matches a previous run.""" - class TestTest(unittest.TestCase): - def test_foo(self): - raise IndexError() - self._try_test_run(TestTest, """ - - Foobar - - - - -""") - - def test_stdout_capture(self): - """Regression test: Check whether a test run with output to stdout - matches a previous run.""" - class TestTest(unittest.TestCase): - def test_foo(self): - print "Test" - self._try_test_run(TestTest, """ - - - - -""") - - def test_stderr_capture(self): - """Regression test: Check whether a test run with output to stderr - matches a previous run.""" - class TestTest(unittest.TestCase): - def test_foo(self): - print >>sys.stderr, "Test" - self._try_test_run(TestTest, """ - - - - -""") - - class NullStream(object): - """A file-like object that discards everything written to it.""" - def write(self, buffer): - pass - - def test_unittests_changing_stdout(self): - """Check whether the XmlTestRunner recovers gracefully from unit tests - that change stdout, but don't change it back properly. - """ - class TestTest(unittest.TestCase): - def test_foo(self): - sys.stdout = XmlTestRunnerTest.NullStream() - - runner = XmlTestRunner(self._stream) - runner.run(unittest.makeSuite(TestTest)) - - def test_unittests_changing_stderr(self): - """Check whether the XmlTestRunner recovers gracefully from unit tests - that change stderr, but don't change it back properly. - """ - class TestTest(unittest.TestCase): - def test_foo(self): - sys.stderr = XmlTestRunnerTest.NullStream() - - runner = XmlTestRunner(self._stream) - runner.run(unittest.makeSuite(TestTest)) - - -if __name__ == "__main__": - suite = unittest.makeSuite(XmlTestRunnerTest) - unittest.TextTestRunner().run(suite) diff --git a/pyjenkinsci_egg/src/pyjenkinsci/view.py b/pyjenkinsci_egg/src/pyjenkinsci/view.py deleted file mode 100644 index 18fd863..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci/view.py +++ /dev/null @@ -1,61 +0,0 @@ -from pyjenkinsci.jenkinsobject import jenkinsobject -from pyjenkinsci.job import job - -class view( jenkinsobject ): - - def __init__(self, url, name, jenkins_obj): - self.name = name - self.jenkins_obj = jenkins_obj - jenkinsobject.__init__(self, url) - - def __str__(self): - return self.name - - def __getitem__(self, str_job_id ): - assert isinstance( str_job_id, str ) - api_url = self.python_api_url( self.get_job_url( str_job_id ) ) - return job( api_url, str_job_id, self.jenkins_obj ) - - def keys(self): - return self.get_job_dict().keys() - - def iteritems(self): - for name, url in self.get_job_dict().iteritems(): - api_url = self.python_api_url( url ) - yield name, job( api_url, name, self.jenkins_obj ) - - def values(self): - return [ a[1] for a in self.iteritems() ] - - def items(self): - return [ a for a in self.iteritems() ] - - def _get_jobs( self ): - if not self._data.has_key( "jobs" ): - pass - else: - for viewdict in self._data["jobs"]: - yield viewdict["name"], viewdict["url"] - - def get_job_dict(self): - return dict( self._get_jobs() ) - - def __len__(self): - return len( self.get_job_dict().keys() ) - - def get_job_url( self, str_job_name ): - try: - job_dict = self.get_job_dict() - return job_dict[ str_job_name ] - except KeyError, ke: - all_views = ", ".join( job_dict.keys() ) - raise KeyError("Job %s is not known - available: %s" % ( str_job_name, all_views ) ) - - def get_jenkins_obj(self): - return self.jenkins_obj - - def id(self): - """ - Calculate an ID for this object. - """ - return "%s.%s" % ( self.className, self.name ) \ No newline at end of file diff --git a/pyjenkinsci_egg/src/pyjenkinsci_tests/__init__.py b/pyjenkinsci_egg/src/pyjenkinsci_tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyjenkinsci_egg/src/pyjenkinsci_tests/config.py b/pyjenkinsci_egg/src/pyjenkinsci_tests/config.py deleted file mode 100644 index 6d04682..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci_tests/config.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - -#Disable HTTP PROXY -CLEAR_PROXY = os.environ.get("CLEAR_PROXY","") -if len( CLEAR_PROXY ) > 0: - del os.environ["HTTP_PROXY"] - -JENKINS_BASE = os.environ.get( "JENKINS_BASE", "http://localhost:8080/jenkins" ) -HTTP_PROXY = os.environ.get( "HTTP_PROXY", "" ) -BUILD_NAME_TEST1 = "test1" - -if __name__ == "__main__": - print( "Jenkins base: %s" % JENKINS_BASE ) - print( "Http Proxy: %s" %HTTP_PROXY ) \ No newline at end of file diff --git a/pyjenkinsci_egg/src/pyjenkinsci_tests/test_api.py b/pyjenkinsci_egg/src/pyjenkinsci_tests/test_api.py deleted file mode 100644 index 98791b1..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci_tests/test_api.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -Important: For this test to work we need at least one Jenkins server -You need to configure the JENKINS_BASE environment variable -And you need to enure that this Jenkins has at least one job called "test1". -Make sure that sucsessful builds of test one archive an artifact called "test1.txt" - it can be anything. -""" -import unittest -import logging - -from pyjenkinsci.build import build -from pyjenkinsci.result_set import result_set -from pyjenkinsci.result import result -from pyjenkinsci import api -from pyjenkinsci_tests.config import JENKINS_BASE, BUILD_NAME_TEST1 - -if __name__ == "__main__": - logging.basicConfig() - -log = logging.getLogger(__name__) - -class test_api( unittest.TestCase ): - """ - Perform a number of basic queries. - """ - - def setUp(self): - pass - - def test_get_latest_build_results(self): - lb = api.get_latest_build(JENKINS_BASE, BUILD_NAME_TEST1) - assert isinstance(lb, build) - rs = lb.get_resultset() - assert isinstance( rs, result_set ) - assert len(rs) > 0 - - for id, res in rs.items(): - assert isinstance( res, result ), "Expected result-set object, got %s" % repr(res) - - -if __name__ == "__main__": - unittest.main() diff --git a/pyjenkinsci_egg/src/pyjenkinsci_tests/test_query.py b/pyjenkinsci_egg/src/pyjenkinsci_tests/test_query.py deleted file mode 100644 index 202beb1..0000000 --- a/pyjenkinsci_egg/src/pyjenkinsci_tests/test_query.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Important: For this test to work we need at least one Jenkins server -You need to configure the JENKINS_BASE environment variable -And you need to enure that this Jenkins has at least one job called "test1". -Make sure that sucsessful builds of test one archive an artifact called "test1.txt" - it can be anything. -""" -import unittest -import logging - -from pyjenkinsci.jenkins import jenkins -from pyjenkinsci.artifact import artifact -from pyjenkinsci.build import build -from pyjenkinsci_tests.config import HTTP_PROXY, JENKINS_BASE - -if __name__ == "__main__": - logging.basicConfig() - -log = logging.getLogger(__name__) - -class test_query( unittest.TestCase ): - """ - Perform a number of basic queries. - """ - - def setUp(self): - log.warn("Connecting to %s via proxy: %s" % (JENKINS_BASE, HTTP_PROXY) ) - self.jenkins = jenkins( JENKINS_BASE ) - - def testListJobs(self): - """ - Test that we can get a list of jobs - """ - job_ids = self.jenkins.keys() - assert "test1" in job_ids - - def testListBuilds(self): - """ - """ - test1 = self.jenkins["test1"] - builds = [a for a in test1.get_build_ids() ] - assert len(builds) > 0 - newest_build = test1[ builds[-1] ] - assert isinstance( newest_build, build ) - - def testGetLatestArtifact(self): - test1 = self.jenkins["test1"] - builds = [a for a in test1.get_build_ids() ] - assert len(builds) > 0 - newest_build = test1[ builds[0] ] - assert isinstance( newest_build, build ) - artifact_dict = newest_build.get_artifact_dict() - assert "test1.txt" in artifact_dict.keys() - test_artifact = artifact_dict[ "test1.txt" ] - assert isinstance( test_artifact, artifact ) - - -if __name__ == "__main__": - unittest.main() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..03fb136 --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +from setuptools import setup, find_packages + +GLOBAL_ENTRY_POINTS = { + "console_scripts":[ "jenkins_invoke=pyhudson.command_line.hudson_invoke:main", + "meta_test=pyhudson.command_line.meta_test:main", ] } + +setup(name='pyjenkinsci', + version='0.0.35', + description='A Python API for accessing resources a Hudson or Jenkins continuous-integration server.', + author='Salim Fadhley', + author_email='sal@stodge.org', + #install_requires = [ 'elementtree>=1.2-20040618' ], + #tests = "tests", + package_dir = {'':'src'}, + packages=find_packages('src'), + zip_safe=True, + include_package_data = False, + entry_points = GLOBAL_ENTRY_POINTS, + ) diff --git a/setup_dev.bat b/setup_dev.bat deleted file mode 100644 index 95fa360..0000000 --- a/setup_dev.bat +++ /dev/null @@ -1,3 +0,0 @@ -cd /D %~dp0 -cd pyjenkinsci_egg -python setup.py develop -m