--- /dev/null
+.svn
+.project
+.pydevproject
--- /dev/null
+cd /D %~dp0\r
+cd pyjenkinsci_egg\r
+python setup.py bdist_egg
\ No newline at end of file
--- /dev/null
+from setuptools import setup, find_packages\r
+\r
+GLOBAL_ENTRY_POINTS = {\r
+ "console_scripts":[ "jenkins_invoke=pyhudson.command_line.hudson_invoke:main",\r
+ "meta_test=pyhudson.command_line.meta_test:main", ] }\r
+\r
+setup(name='pyjenkinsci',\r
+ version='0.0.35',\r
+ description='A Python API for accessing resources a Hudson or Jenkins continuous-integration server.',\r
+ author='Salim Fadhley',\r
+ author_email='sal@stodge.org',\r
+ #install_requires = [ 'elementtree>=1.2-20040618' ],\r
+ #tests = "tests",\r
+ package_dir = {'':'src'},\r
+ packages=find_packages('src'),\r
+ zip_safe=True,\r
+ include_package_data = False,\r
+ entry_points = GLOBAL_ENTRY_POINTS,\r
+ )\r
--- /dev/null
+from pyjenkinsci.jenkins import jenkins\r
+from pyjenkinsci.artifact import artifact\r
+from pyjenkinsci.exceptions import ArtifactsMissing, TimeOut, BadURL\r
+from pyjenkinsci import constants\r
+from urllib2 import urlparse\r
+\r
+import os\r
+import time\r
+import logging\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+def get_latest_test_results( jenkinsurl, jobname ):\r
+ """\r
+ A convenience function to fetch down the very latest test results from a jenkins job.\r
+ """\r
+ latestbuild = get_latest_build( jenkinsurl, jobname )\r
+ res = latestbuild.get_resultset()\r
+ return res\r
+\r
+def get_latest_build( jenkinsurl, jobname ):\r
+ """\r
+ A convenience function to fetch down the very latest test results from a jenkins job.\r
+ """\r
+ jenkinsci = jenkins( jenkinsurl )\r
+ job = jenkinsci[ jobname ]\r
+ return job.get_last_build()\r
+\r
+def get_latest_complete_build( jenkinsurl, jobname ):\r
+ """\r
+ A convenience function to fetch down the very latest test results from a jenkins job.\r
+ """\r
+ jenkinsci = jenkins( jenkinsurl )\r
+ job = jenkinsci[ jobname ]\r
+ return job.get_last_completed_build()\r
+\r
+def get_artifacts( jenkinsurl, jobid=None, build_no=None, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None ):\r
+ """\r
+ Find all the artifacts for the latest build of a job.\r
+ """\r
+ jenkinsci = jenkins( jenkinsurl, proxyhost, proxyport, proxyuser, proxypass )\r
+ job = jenkinsci[ jobid ]\r
+ if build_no:\r
+ build = job.get_build( build_no )\r
+ else:\r
+ build = job.get_last_good_build()\r
+ artifacts = dict( (artifact.filename, artifact) for artifact in build.get_artifacts() )\r
+ log.info("Found %i artifacts in '%s'" % ( len(artifacts.keys() ), build_no ) )\r
+ return artifacts\r
+\r
+def search_artifacts(jenkinsurl, jobid, artifact_ids=None, same_build=True, build_search_limit=None):\r
+ """\r
+ Search the entire history of a jenkins job for a list of artifact names. If same_build\r
+ is true then ensure that all artifacts come from the same build of the job\r
+ """\r
+ if len( artifact_ids ) == 0 or artifact_ids is None:\r
+ return []\r
+ assert same_build, "same_build==False not supported yet"\r
+ jenkinsci = jenkins( jenkinsurl )\r
+ job = jenkinsci[ jobid ]\r
+ build_ids = job.get_build_ids()\r
+ for build_id in build_ids:\r
+ build = job.get_build( build_id )\r
+ artifacts = build.get_artifact_dict()\r
+ if set( artifact_ids ).issubset( set( artifacts.keys() ) ):\r
+ return dict( ( a,artifacts[a] ) for a in artifact_ids )\r
+ missing_artifacts = set( artifact_ids ) - set( artifacts.keys() )\r
+ log.debug("Artifacts %s missing from %s #%i" % ( ", ".join( missing_artifacts ), jobid, build_id ))\r
+ raise ArtifactsMissing( missing_artifacts )\r
+\r
+def grab_artifact( jenkinsurl, jobid, artifactid, targetdir ):\r
+ """\r
+ Convenience method to find the latest good version of an artifact and save it\r
+ to a target directory. Directory is made automatically if not exists.\r
+ """\r
+ artifacts = get_artifacts( jenkinsurl, jobid )\r
+ artifact = artifacts[ artifactid ]\r
+ if not os.path.exists( targetdir ):\r
+ os.makedirs( targetdir )\r
+ artifact.savetodir( targetdir)\r
+\r
+def block_until_complete( jenkinsurl, jobs, maxwait=12000, interval=30, raise_on_timeout=True ):\r
+ """\r
+ Wait until all of the jobs in the list are complete.\r
+ """\r
+ assert maxwait > 0\r
+ assert maxwait > interval\r
+ assert interval > 0\r
+\r
+ obj_jenkins = jenkins( jenkinsurl )\r
+ obj_jobs = [ obj_jenkins[ jid ] for jid in jobs ]\r
+ for time_left in xrange( maxwait, 0, -interval ):\r
+ still_running = [ j for j in obj_jobs if j.is_queued_or_running() ]\r
+ if not still_running:\r
+ return\r
+ str_still_running = ", ".join( '"%s"' % str(a) for a in still_running )\r
+ log.warn( "Waiting for jobs %s to complete. Will wait another %is" % ( str_still_running, time_left ) )\r
+ time.sleep( interval )\r
+ if raise_on_timeout:\r
+ raise TimeOut( "Waited too long for these jobs to complete: %s" % str_still_running )\r
+\r
+def get_view_from_url( url ):\r
+ """\r
+ Factory method\r
+ """\r
+ matched = constants.RE_SPLIT_VIEW_URL.search(url)\r
+ if not matched:\r
+ raise BadURL("Cannot parse URL %s" % url )\r
+ jenkinsurl, view_name = matched.groups()\r
+ jenkinsci = jenkins( jenkinsurl )\r
+ return jenkinsci.get_view( view_name )\r
+\r
+def install_artifacts( artifacts, dirstruct, installdir, basestaticurl ):\r
+ """\r
+ Install the artifacts.\r
+ """\r
+ assert basestaticurl.endswith("/"), "Basestaticurl should end with /"\r
+ installed = []\r
+ for reldir, artifactnames in dirstruct.items():\r
+ destdir = os.path.join( installdir, reldir )\r
+ if not os.path.exists( destdir ):\r
+ log.warn( "Making install directory %s" % destdir )\r
+ os.makedirs( destdir )\r
+ else:\r
+ assert os.path.isdir( destdir )\r
+ for artifactname in artifactnames:\r
+ destpath = os.path.abspath( os.path.join( destdir, artifactname ) )\r
+ if artifactname in artifacts.keys():\r
+ # The artifact must be loaded from jenkins\r
+ theartifact = artifacts[ artifactname ]\r
+ else:\r
+ # It's probably a static file, we can get it from the static collection\r
+ staticurl = urlparse.urljoin( basestaticurl, artifactname )\r
+ theartifact = artifact( artifactname, staticurl )\r
+ theartifact.save( destpath )\r
+ installed.append( destpath )\r
+ return installed\r
--- /dev/null
+import urllib2\r
+import os\r
+import logging\r
+import cStringIO\r
+import zipfile\r
+import cPickle\r
+import datetime\r
+\r
+from pyjenkinsci import config\r
+from pyjenkinsci.utils.retry import retry_function\r
+from pyjenkinsci.exceptions import ArtifactBroken\r
+from pyjenkinsci.utils.md5hash import new_digest\r
+\r
+log = logging.getLogger( __name__ )\r
+\r
+class artifact( object ):\r
+\r
+ @staticmethod\r
+ def timedelta_to_seconds( td ):\r
+ secs = float( td.seconds )\r
+ secs += td.microseconds / 1000000.0\r
+ secs += td.days * 86400\r
+ return secs\r
+\r
+ def __init__( self, filename, url, build=None ):\r
+ self.filename = filename\r
+ self.url = url\r
+ self.build = build\r
+\r
+ def unpickle(self, method="pickle" ):\r
+ """\r
+ Assume that the object is a pickled stream.\r
+ """\r
+ stream, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream )\r
+\r
+ while True:\r
+ try:\r
+ yield cPickle.load( stream )\r
+ except EOFError:\r
+ break\r
+\r
+ def logging_buffer_copy( self, input_stream, output_stream, length, chunks=10 ):\r
+\r
+ chunk_points = int( length / chunks )\r
+\r
+ start_time = datetime.datetime.now()\r
+ last_time = datetime.datetime.now()\r
+\r
+ for index in xrange( 0, length ):\r
+ output_stream.write( input_stream.read(1) )\r
+\r
+ if chunk_points > 0:\r
+ if ( index % chunk_points ) == 0 and ( index > 0 ):\r
+ now = datetime.datetime.now()\r
+\r
+ try:\r
+ time_elapsed_since_start = self.timedelta_to_seconds( now - start_time )\r
+ # avg_bitrate = ( index / time_elapsed_since_start ) / 1024.0\r
+ time_elapsed_since_last_chunk = self.timedelta_to_seconds( now - last_time )\r
+ inst_bitrate = ( chunk_points / time_elapsed_since_last_chunk ) / 1024.0\r
+ except ZeroDivisionError, _:\r
+ continue\r
+\r
+ log.info( "Loaded %i of %i bytes %.2f kbit/s" % ( index, length, inst_bitrate ) )\r
+ last_time = now\r
+\r
+\r
+ def getstream( self ):\r
+ """\r
+ Get the artifact as a stream\r
+ """\r
+ artifact_digest = new_digest()\r
+ tmp_buffer = cStringIO.StringIO()\r
+\r
+ if self.build:\r
+ fn_opener = self.build.job.hudson.get_opener()\r
+ else:\r
+ fn_opener = urllib2.urlopen\r
+\r
+ try:\r
+ inputstream = fn_opener( self.url, )\r
+ content_type = inputstream.info().get("content-type", "unknown")\r
+\r
+ try:\r
+ content_length = int( inputstream.info()["content-length"] )\r
+ self.logging_buffer_copy( inputstream, tmp_buffer, content_length )\r
+ except KeyError, ke:\r
+ # Could not get length.\r
+ log.warn("Could not get length")\r
+ tmp_buffer.write( inputstream.read() )\r
+\r
+ except urllib2.HTTPError:\r
+ log.warn( "Error fetching %s" % self.url )\r
+ raise\r
+ tmp_buffer.seek(0)\r
+\r
+ artifact_digest.update(tmp_buffer.getvalue())\r
+ artifact_hexdigest = artifact_digest.hexdigest()\r
+\r
+ artifact_size = len(tmp_buffer.getvalue())\r
+ log.info( "Got %s, %i bytes, MD5: %s, type: %s" % ( self.filename, artifact_size, artifact_hexdigest, content_type ) )\r
+\r
+ if self.build:\r
+ self.build.job.hudson.validate_fingerprint( artifact_hexdigest )\r
+\r
+ return tmp_buffer, artifact_hexdigest\r
+\r
+ def openzip( self ):\r
+ """\r
+ Open the artifact as a zipfile.\r
+ """\r
+ buffer, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream )\r
+ zf = zipfile.ZipFile( buffer, "r" )\r
+ return zf\r
+\r
+ def save( self, fspath ):\r
+ """\r
+ Save the artifact to an explicit path. The containing directory must exist.\r
+ Returns a reference to the file which has just been writen to.\r
+ """\r
+\r
+ log.info( "Saving artifact @ %s to %s" % (self.url, fspath) )\r
+\r
+ if not fspath.endswith( self.filename ):\r
+ log.warn( "Attempt to change the filename of artifact %s on save." % self.filename )\r
+\r
+ if os.path.exists( fspath ):\r
+ existing_hexdigest = self.get_local_digest( fspath )\r
+ if self.build:\r
+ try:\r
+ valid = self.build.job.hudson.validate_fingerprint_for_build( existing_hexdigest, filename=self.filename, job=self.build.job.id(), build=self.build.id() )\r
+\r
+ if valid:\r
+ log.info( "Local copy of %s is already up to date. MD5 %s" % (self.filename, existing_hexdigest) )\r
+ else:\r
+ self.__do_download( fspath )\r
+ except ArtifactBroken, ab: #@UnusedVariable\r
+ log.info("Hudson artifact could not be identified.")\r
+ else:\r
+ log.info("This file did not originate from Hudson, so cannot check.")\r
+ self.__do_download( fspath )\r
+ else:\r
+ log.info("Local file is missing, downloading new.")\r
+ self.__do_download( fspath )\r
+\r
+ def get_local_digest( self, fspath ):\r
+ tmp_buffer_existing = cStringIO.StringIO()\r
+ existingfile = open( fspath, "rb" )\r
+ tmp_buffer_existing.write( existingfile.read() )\r
+ existing_digest = new_digest()\r
+ existing_digest.update(tmp_buffer_existing.getvalue())\r
+ existing_hexdigest = existing_digest.hexdigest()\r
+ return existing_hexdigest\r
+\r
+ def __do_download( self, fspath ):\r
+\r
+ filedir, _ = os.path.split( fspath )\r
+ if not os.path.exists( filedir ):\r
+ log.warn( "Making missing directory %s" % filedir )\r
+ os.makedirs( filedir )\r
+\r
+ try:\r
+ outputfile = open( fspath, "wb" )\r
+ except IOError, ioe:\r
+ log.critical("User %s@%s cannot open file" % ( os.environ.get("USERNAME","unknown"),os.environ.get("USERDOMAIN","unknown") ) )\r
+ raise\r
+\r
+ tmp_buffer_downloaded, artifact_hexdigest = retry_function( config.LOAD_ATTEMPTS , self.getstream )\r
+\r
+ outputfile.write( tmp_buffer_downloaded.getvalue() )\r
+ return outputfile\r
+\r
+\r
+ def savetodir( self, dirpath ):\r
+ """\r
+ Save the artifact to a folder. The containing directory must be exist, but use the artifact's\r
+ default filename.\r
+ """\r
+ assert os.path.exists( dirpath )\r
+ assert os.path.isdir( dirpath )\r
+ outputfilepath = os.path.join( dirpath, self.filename )\r
+ self.save( outputfilepath )\r
+\r
+\r
+ def __repr__( self ):\r
+ return """<%s.%s %s>""" % ( self.__class__.__module__,\r
+ self.__class__.__name__,\r
+ self.url )\r
--- /dev/null
+from pyjenkinsci.jenkinsobject import jenkinsobject\r
+from pyjenkinsci.artifact import artifact\r
+from pyjenkinsci.result_set import result_set\r
+import time\r
+import logging\r
+from pyjenkinsci import config\r
+from pyjenkinsci.exceptions import NoResults, FailedNoResults\r
+from pyjenkinsci.constants import STATUS_FAIL, STATUS_ABORTED, RESULTSTATUS_FAILURE\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class build( jenkinsobject ):\r
+ """\r
+ Represents a jenkins build, executed in context of a job.\r
+ """\r
+\r
+ STR_TOTALCOUNT = "totalCount"\r
+ STR_TPL_NOTESTS_ERR = "%s has status %s, and does not have any test results"\r
+\r
+ def __init__( self, url, buildno, job ):\r
+ assert type(buildno) == int\r
+ self.buildno = buildno\r
+ self.job = job\r
+ jenkinsobject.__init__( self, url )\r
+\r
+ def __str__(self):\r
+ return self._data['fullDisplayName']\r
+\r
+ def id(self):\r
+ return self._data["number"]\r
+\r
+ def get_status(self):\r
+ return self._data["result"]\r
+\r
+ def get_duration(self):\r
+ return self._data["duration"]\r
+\r
+ def get_artifacts( self ):\r
+ for afinfo in self._data["artifacts"]:\r
+ url = "%sartifact/%s" % ( self.baseurl, afinfo["relativePath"] )\r
+ af = artifact( afinfo["fileName"], url, self )\r
+ yield af\r
+ del af, url\r
+\r
+ def get_artifact_dict(self):\r
+ return dict( (a.filename, a) for a in self.get_artifacts() )\r
+\r
+ def is_running( self ):\r
+ """\r
+ Return a bool if running.\r
+ """\r
+ self.poll()\r
+ return self._data["building"]\r
+\r
+ def is_good( self ):\r
+ """\r
+ Return a bool, true if the build was good.\r
+ If the build is still running, return False.\r
+ """\r
+ return ( not self.is_running() ) and self._data["result"] == 'SUCCESS'\r
+\r
+ def block_until_complete(self, delay=15):\r
+ assert isinstance( delay, int )\r
+ count = 0\r
+ while self.is_running():\r
+ total_wait = delay * count\r
+ log.info("Waited %is for %s #%s to complete" % ( total_wait, self.job.id(), self.id() ) )\r
+ time.sleep( delay )\r
+ count += 1\r
+\r
+ def get_jenkins_obj(self):\r
+ return self.job.get_jenkins_obj()\r
+\r
+ def get_result_url(self):\r
+ """\r
+ Return the URL for the object which provides the job's result summary.\r
+ """\r
+ url_tpl = r"%stestReport/%s"\r
+ return url_tpl % ( self._data["url"] , config.JENKINS_API )\r
+\r
+ def get_resultset(self):\r
+ """\r
+ Obtain detailed results for this build.\r
+ """\r
+ result_url = self.get_result_url()\r
+ if self.STR_TOTALCOUNT not in self.get_actions():\r
+ raise NoResults( "%s does not have any published results" % str(self) )\r
+ buildstatus = self.get_status()\r
+ if buildstatus in [ STATUS_FAIL, RESULTSTATUS_FAILURE, STATUS_ABORTED ]:\r
+ raise FailedNoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) )\r
+ if self.get_actions()[ self.STR_TOTALCOUNT ] == 0:\r
+ raise NoResults( self.STR_TPL_NOTESTS_ERR % ( str(self), buildstatus ) )\r
+ obj_results = result_set( result_url, build=self )\r
+ return obj_results\r
+\r
+ def has_resultset(self):\r
+ """\r
+ Return a boolean, true if a result set is available. false if not.\r
+ """\r
+ return self.STR_TOTALCOUNT in self.get_actions()\r
+\r
+ def get_actions(self):\r
+ all_actions = {}\r
+ for dct_action in self._data["actions"]:\r
+ all_actions.update( dct_action )\r
+ return all_actions\r
+\r
--- /dev/null
+from optparse import OptionParser\r
+\r
+class base( object ):\r
+\r
+ @classmethod\r
+ def mkparser(cls):\r
+ parser = OptionParser()\r
+ return parser\r
--- /dev/null
+import os\r
+import sys\r
+import logging\r
+\r
+from pyjenkinsci.command_line.base import base\r
+from pyjenkinsci.jenkins import jenkins\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class jenkins_invoke( base ):\r
+\r
+ @classmethod\r
+ def mkparser(cls):\r
+ parser = base.mkparser( )\r
+ DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )\r
+ parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."\r
+ parser.add_option("-J", "--jenkinsbase", dest="baseurl",\r
+ help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,\r
+ type="str",\r
+ default=DEFAULT_BASEURL, )\r
+ parser.add_option("-b", "--block", dest="block",\r
+ help="Block until each of the jobs is complete." ,\r
+ action="store_true",\r
+ default=False )\r
+ parser.add_option("-t", "--token", dest="token",\r
+ help="Optional security token." ,\r
+ default=None )\r
+ return parser\r
+\r
+ @classmethod\r
+ def main(cls):\r
+ parser = cls.mkparser()\r
+ options, args = parser.parse_args()\r
+ try:\r
+ assert len( args ) > 0, "Need to specify at least one job name"\r
+ except AssertionError, e:\r
+ log.critical( e[0] )\r
+ parser.print_help()\r
+ sys.exit(1)\r
+ invoker = cls( options, args )\r
+ invoker()\r
+\r
+ def __init__( self, options, jobs ):\r
+ self.options = options\r
+ self.jobs = jobs\r
+\r
+ def __call__(self):\r
+ for job in self.jobs:\r
+ self.invokejob( job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token )\r
+\r
+ def invokejob(self, jobname, block, baseurl, token ):\r
+ assert type(block) == bool\r
+ assert type(baseurl) == str\r
+ assert type(jobname) == str\r
+ assert token is None or isinstance( token, str )\r
+ jenkinsserver = jenkins( baseurl )\r
+ job = jenkinsserver[ jobname ]\r
+ job.invoke( securitytoken=token, block=block )\r
+\r
+\r
+def main( ):\r
+ logging.basicConfig()\r
+ logging.getLogger("").setLevel( logging.INFO )\r
+ jenkins_invoke.main()\r
--- /dev/null
+import optparse\r
+import os\r
+import random\r
+import logging\r
+from pyjenkinsci.utils import junitxml\r
+from pyjenkinsci.utils.id import mk_id\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class meta_test(object):\r
+ ATTEMPTS=3\r
+\r
+ @classmethod\r
+ def mkParser(cls):\r
+ parser = optparse.OptionParser()\r
+\r
+ def __init__(self, opts=None):\r
+ self.opts = opts\r
+\r
+ def testFunction(self):\r
+ if random.random() < 0.1:\r
+ raise AssertionError("The value was too small")\r
+ return 0\r
+\r
+ def __call__(self):\r
+ temp_dir = os.environ.get("TEMP", r"c:\temp" )\r
+ output_dir = os.environ.get( "WORKSPACE", temp_dir )\r
+ result_filepath = os.path.join( output_dir, "results.xml" )\r
+ stream = open( result_filepath, "wb" )\r
+ testsuite_name = mk_id()\r
+ ju = junitxml.junitxml( stream, testsuite_name)\r
+\r
+\r
+ classname = mk_id()\r
+ for i in xrange(0, self.ATTEMPTS ):\r
+ tr = ju.startTest( classname, mk_id() )\r
+ try:\r
+ tr.run( self.testFunction )\r
+ except Exception, e:\r
+ log.exception(e)\r
+ continue\r
+\r
+ ju.write()\r
+\r
+def main( ):\r
+ logging.basicConfig()\r
+ return meta_test()()\r
+\r
+if __name__ == "__main__":\r
+ main()\r
--- /dev/null
+JENKINS_API = r"api/python/"\r
+LOAD_TIMEOUT = 30\r
+LOAD_ATTEMPTS = 5
\ No newline at end of file
--- /dev/null
+import re\r
+\r
+STATUS_FAIL = "FAIL"\r
+STATUS_ERROR = "ERROR"\r
+STATUS_ABORTED = "ABORTED"\r
+STATUS_REGRESSION = "REGRESSION"\r
+\r
+STATUS_FIXED = "FIXED"\r
+STATUS_PASSED = "PASSED"\r
+\r
+RESULTSTATUS_FAILURE = "FAILURE"\r
+RESULTSTATUS_FAILED = "FAILED"\r
+\r
+STR_RE_SPLIT_VIEW = "(.*)/view/([^/]*)/?"\r
+RE_SPLIT_VIEW_URL = re.compile( STR_RE_SPLIT_VIEW )\r
--- /dev/null
+class ArtifactsMissing(Exception):\r
+ """\r
+ Cannot find a build with all of the required artifacts.\r
+ """\r
+\r
+class UnknownJob( KeyError ):\r
+ """\r
+ Hudson does not recognize the job requested.\r
+ """\r
+\r
+class ArtifactBroken(Exception):\r
+ """\r
+ An artifact is broken, wrong\r
+ """\r
+\r
+class TimeOut( Exception ):\r
+ """\r
+ Some jobs have taken too long to complete.\r
+ """\r
+\r
+class WillNotBuild(Exception):\r
+ """\r
+ Cannot trigger a new build.\r
+ """\r
+\r
+class NoBuildData(Exception):\r
+ """\r
+ A job has no build data.\r
+ """\r
+\r
+class NoResults(Exception):\r
+ """\r
+ A build did not publish any results.\r
+ """\r
+\r
+class FailedNoResults(NoResults):\r
+ """\r
+ A build did not publish any results because it failed\r
+ """\r
+\r
+class BadURL(ValueError):\r
+ """\r
+ A URL appears to be broken\r
+ """\r
--- /dev/null
+from pyjenkinsci.jenkinsobject import jenkinsobject\r
+from pyjenkinsci.exceptions import ArtifactBroken\r
+\r
+import urllib2\r
+import re\r
+\r
+import logging\r
+\r
+log = logging.getLogger( __name__ )\r
+\r
+class fingerprint( jenkinsobject ):\r
+ """\r
+ Represents a jenkins fingerprint on a single artifact file ??\r
+ """\r
+ RE_MD5 = re.compile("^([0-9a-z]{32})$")\r
+\r
+ def __init__( self, baseurl, id, jenkins_obj ):\r
+ logging.basicConfig()\r
+ self.jenkins_obj = jenkins_obj\r
+ assert self.RE_MD5.search( id ), "%s does not look like a valid id" % id\r
+ url = "%s/fingerprint/%s/" % ( baseurl, id )\r
+ jenkinsobject.__init__( self, url, poll=False )\r
+ self.id = id\r
+\r
+ def get_jenkins_obj(self):\r
+ return self.jenkins_obj\r
+\r
+ def __str__(self):\r
+ return self.id\r
+\r
+ def valid(self):\r
+ """\r
+ Return True / False if valid\r
+ """\r
+ try:\r
+ self.poll()\r
+ except urllib2.HTTPError, e:\r
+ return False\r
+ return True\r
+\r
+ def validate_for_build(self, filename, job, build):\r
+ if not self.valid():\r
+ log.info("Unknown to jenkins.")\r
+ return False\r
+ if not self._data["original"] is None:\r
+ if self._data["original"]["name"] == job:\r
+ if self._data["original"]["number"] == build:\r
+ return True\r
+ if self._data["fileName"] != filename:\r
+ log.info("Filename from jenkins (%s) did not match provided (%s)" % ( self._data["fileName"], filename ) )\r
+ return False\r
+ for usage_item in self._data["usage"]:\r
+ if usage_item["name"] == job:\r
+ for range in usage_item["ranges"]["ranges"]:\r
+ if range["start"] <= build <= range["end"]:\r
+ log.info("This artifact was generated by %s between build %i and %i" % ( job, range["start"], range["end"] ) )\r
+ return True\r
+ return False\r
+\r
+ def validate(self):\r
+ try:\r
+ assert self.valid()\r
+ except AssertionError, ae:\r
+ raise ArtifactBroken( "Artifact %s seems to be broken, check %s" % ( self.id, self.baseurl ) )\r
+ except urllib2.HTTPError, httpe:\r
+ raise ArtifactBroken( "Unable to validate artifact id %s using %s" % ( self.id, self.baseurl ) )\r
+ return True\r
+\r
+ def get_info( self ):\r
+ """\r
+ Returns a tuple of build-name, build# and artifiact filename for a good build.\r
+ """\r
+ self.poll()\r
+ return self._data["original"]["name"], self._data["original"]["number"], self._data["fileName"]\r
+\r
+\r
+if __name__ == "__main__":\r
+ ff = fingerprint( "http://localhost:8080/hudson/", "0f37cbb6545b8778bc0700d90be66bf3" )\r
+ print repr(ff)\r
+ print ff.baseurl\r
+ print ff.valid()\r
+ print ff.get_info( )\r
--- /dev/null
+from pyjenkinsci.exceptions import UnknownJob\r
+from pyjenkinsci.fingerprint import fingerprint\r
+from pyjenkinsci.jenkinsobject import jenkinsobject\r
+from pyjenkinsci.job import job\r
+from pyjenkinsci.utils.urlopener import mkurlopener\r
+from pyjenkinsci.view import view\r
+import logging\r
+import time\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class jenkins( jenkinsobject ):\r
+ """\r
+ Represents a jenkins environment.\r
+ """\r
+ def __init__(self, baseurl, proxyhost=None, proxyport=None, proxyuser=None, proxypass=None):\r
+ self.proxyhost = proxyhost\r
+ self.proxyport = proxyport\r
+ self.proxyuser = proxyuser\r
+ self.proxypass = proxypass\r
+ jenkinsobject.__init__( self, baseurl )\r
+\r
+ def get_proxy_auth(self):\r
+ return (self.proxyhost, self.proxyport, self.proxyuser, self.proxypass)\r
+\r
+ def get_opener( self ):\r
+ return mkurlopener(*self.get_proxy_auth())\r
+\r
+ def validate_fingerprint( self, id ):\r
+ obj_fingerprint = fingerprint(self.baseurl, id, jenkins_obj=self)\r
+ obj_fingerprint.validate()\r
+ log.info("Jenkins says %s is valid" % id)\r
+\r
+ def get_artifact_data(self, id):\r
+ obj_fingerprint = fingerprint(self.baseurl, id, jenkins_obj=self)\r
+ obj_fingerprint.validate()\r
+ return obj_fingerprint.get_info()\r
+\r
+ def validate_fingerprint_for_build(self, digest, filename, job, build ):\r
+ obj_fingerprint = fingerprint( self.baseurl, digest, jenkins_obj=self )\r
+ return obj_fingerprint.validate_for_build( filename, job, build )\r
+\r
+ def get_jenkins_obj(self):\r
+ return self\r
+\r
+ def get_jobs(self):\r
+ """\r
+ Fetch all the build-names on this Hudson server.\r
+ """\r
+ for info in self._data["jobs"]:\r
+ yield info["name"], job( info["url"], info["name"], jenkins_obj=self)\r
+\r
+ def iteritems(self):\r
+ return self.get_jobs()\r
+\r
+ def iterkeys(self):\r
+ for info in self._data["jobs"]:\r
+ yield info["name"]\r
+\r
+ def keys(self):\r
+ return [ a for a in self.iterkeys() ]\r
+\r
+ def __str__(self):\r
+ return "Jenkins server at %s" % self.baseurl\r
+\r
+ def _get_views( self ):\r
+ if not self._data.has_key( "views" ):\r
+ pass\r
+ else:\r
+ for viewdict in self._data["views"]:\r
+ yield viewdict["name"], viewdict["url"]\r
+\r
+ def get_view_dict(self):\r
+ return dict( self._get_views() )\r
+\r
+ def get_view_url( self, str_view_name ):\r
+ try:\r
+ view_dict = self.get_view_dict()\r
+ return view_dict[ str_view_name ]\r
+ except KeyError, ke:\r
+ all_views = ", ".join( view_dict.keys() )\r
+ raise KeyError("View %s is not known - available: %s" % ( str_view_name, all_views ) )\r
+\r
+ def get_view(self, str_view_name ):\r
+ view_url = self.get_view_url( str_view_name )\r
+ view_api_url = self.python_api_url( view_url )\r
+ return view(view_api_url , str_view_name, jenkins_obj=self)\r
+\r
+ def __getitem__( self, buildname ):\r
+ """\r
+ Get a build\r
+ """\r
+ for name, job in self.get_jobs():\r
+ if name == buildname:\r
+ return job\r
+ raise UnknownJob(buildname)\r
--- /dev/null
+import urllib2\r
+import logging\r
+import pprint\r
+from pyjenkinsci import config\r
+from pyjenkinsci.utils.retry import retry_function\r
+\r
+log = logging.getLogger( __name__ )\r
+\r
+class jenkinsobject( object ):\r
+ """\r
+ This appears to be the base object that all other jenkins objects are inherited from\r
+ """\r
+ RETRY_ATTEMPTS = 5\r
+\r
+ def __repr__( self ):\r
+ return """<%s.%s %s>""" % ( self.__class__.__module__,\r
+ self.__class__.__name__,\r
+ str( self ) )\r
+\r
+ def print_data(self):\r
+ pprint.pprint( self._data )\r
+\r
+ def __str__(self):\r
+ raise NotImplemented\r
+\r
+ def __init__( self, baseurl, poll=True ):\r
+ """\r
+ Initialize a jenkins connection\r
+ """\r
+ self.baseurl = baseurl\r
+ if poll:\r
+ try:\r
+ self.poll()\r
+ except urllib2.HTTPError, hte:\r
+ log.exception(hte)\r
+ log.warn( "Failed to conenct to %s" % baseurl )\r
+ raise\r
+\r
+ def poll(self):\r
+ self._data = self._poll()\r
+\r
+ def _poll(self):\r
+ url = self.python_api_url( self.baseurl )\r
+ return retry_function( self.RETRY_ATTEMPTS , self.get_data, url )\r
+\r
+ @classmethod\r
+ def python_api_url( cls, url ):\r
+ if url.endswith( config.JENKINS_API ):\r
+ return url\r
+ else:\r
+ if url.endswith( r"/" ):\r
+ fmt="%s%s"\r
+ else:\r
+ fmt = "%s/%s"\r
+ return fmt % (url, config.JENKINS_API)\r
+\r
+ def get_data( self, url ):\r
+ """\r
+ Find out how to connect, and then grab the data.\r
+ """\r
+ fn_urlopen = self.getHudsonObject().get_opener()\r
+ try:\r
+ stream = fn_urlopen( url )\r
+ result = eval( stream.read() )\r
+ except urllib2.HTTPError, e:\r
+ log.warn( "Error reading %s" % url )\r
+ raise\r
+ return result\r
--- /dev/null
+import logging\r
+import urlparse\r
+import urllib2\r
+import time\r
+\r
+from pyjenkinsci.jenkinsobject import jenkinsobject\r
+from pyjenkinsci.build import build\r
+from pyjenkinsci.exceptions import NoBuildData\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class job( jenkinsobject ):\r
+ """\r
+ Represents a jenkins job\r
+ A job can hold N builds which are the actual execution environments\r
+ """\r
+ def __init__( self, url, name, jenkins_obj ):\r
+ self.name = name\r
+ self.jenkins = jenkins_obj\r
+ jenkinsobject.__init__( self, url )\r
+\r
+ def id( self ):\r
+ return self._data["name"]\r
+\r
+ def __str__(self):\r
+ return self._data["name"]\r
+\r
+ def get_jenkins_obj(self):\r
+ return self.jenkins\r
+\r
+ def get_build_triggerurl( self, token=None ):\r
+ if token is None:\r
+ extra = "build"\r
+ else:\r
+ assert isinstance(token, str ), "token if provided should be a string."\r
+ extra = "build?token=%s" % token\r
+ buildurl = urlparse.urljoin( self.baseurl, extra )\r
+ return buildurl\r
+\r
+ def hit_url(self, url ):\r
+ fn_urlopen = self.get_jenkins_obj().get_opener()\r
+ try:\r
+ stream = fn_urlopen( url )\r
+ html_result = stream.read()\r
+ except urllib2.HTTPError, e:\r
+ log.debug( "Error reading %s" % url )\r
+ raise\r
+ return html_result\r
+\r
+ def invoke( self, securitytoken=None, block=False, skip_if_running=False, invoke_pre_check_delay=3, invoke_block_delay=15 ):\r
+ assert isinstance( invoke_pre_check_delay, (int, float) )\r
+ assert isinstance( invoke_block_delay, (int, float) )\r
+ assert isinstance( block, bool )\r
+ assert isinstance( skip_if_running, bool )\r
+ skip_build = False\r
+ if self.is_queued():\r
+ log.warn( "Will not request new build because %s is already queued" % self.id() )\r
+ skip_build = True\r
+ elif self.is_running():\r
+ if skip_if_running:\r
+ log.warn( "Will not request new build because %s is already running" % self.id() )\r
+ skip_build = True\r
+ else:\r
+ log.warn("Will re-schedule %s even though it is already running" % self.id() )\r
+ original_build_no = self.get_last_buildnumber()\r
+ if skip_build:\r
+ pass\r
+ else:\r
+ log.info( "Attempting to start %s on %s" % ( self.id(), repr(self.get_jenkins_obj()) ) )\r
+ url = self.get_build_triggerurl( securitytoken )\r
+ html_result = self.hit_url( url )\r
+ assert len( html_result ) > 0\r
+ if invoke_pre_check_delay > 0:\r
+ log.info("Waiting for %is to allow Hudson to catch up" % invoke_pre_check_delay )\r
+ time.sleep( invoke_pre_check_delay )\r
+ if block:\r
+ total_wait = 0\r
+ while self.is_queued():\r
+ log.info( "Waited %is for %s to begin..." % ( total_wait, self.id() ) )\r
+ time.sleep( invoke_block_delay )\r
+ total_wait += invoke_block_delay\r
+ if self.is_running():\r
+ running_build = self.get_last_build()\r
+ running_build.block_until_complete( delay=invoke_pre_check_delay )\r
+ assert running_build.is_good()\r
+ else:\r
+ assert self.get_last_buildnumber() > original_build_no, "Job does not appear to have run."\r
+ else:\r
+ if self.is_queued():\r
+ log.info( "%s has been queued." % self.id() )\r
+ elif self.is_running():\r
+ log.info( "%s is running." % self.id() )\r
+ elif original_build_no < self.get_last_buildnumber():\r
+ log.info( "%s has completed." % self.id() )\r
+ else:\r
+ raise AssertionError("The job did not schedule.")\r
+\r
+ def _buildid_for_type(self, buildtype):\r
+ """Gets a buildid for a given type of build"""\r
+ KNOWNBUILDTYPES=["lastSuccessfulBuild", "lastBuild", "lastCompletedBuild"]\r
+ assert buildtype in KNOWNBUILDTYPES\r
+ buildid = self._data[buildtype]["number"]\r
+ assert type(buildid) == int, "Build ID should be an integer, got %s" % repr( buildid )\r
+ return buildid\r
+\r
+ def get_last_good_buildnumber( self ):\r
+ """\r
+ Get the numerical ID of the last good build.\r
+ """\r
+ return self._buildid_for_type(buildtype="lastSuccessfulBuild")\r
+\r
+ def get_last_buildnumber( self ):\r
+ """\r
+ Get the numerical ID of the last build.\r
+ """\r
+ return self._buildid_for_type(buildtype="lastBuild")\r
+\r
+ def get_last_completed_buildnumber( self ):\r
+ """\r
+ Get the numerical ID of the last complete build.\r
+ """\r
+ return self._buildid_for_type(buildtype="lastCompletedBuild")\r
+\r
+ def get_build_dict(self):\r
+ if not self._data.has_key( "builds" ):\r
+ raise NoBuildData( repr(self) )\r
+ return dict( ( a["number"], a["url"] ) for a in self._data["builds"] )\r
+\r
+ def get_build_ids(self):\r
+ """\r
+ Return a sorted list of all good builds as ints.\r
+ """\r
+ return reversed( sorted( self.get_build_dict().keys() ) )\r
+\r
+ def get_last_good_build( self ):\r
+ """\r
+ Get the last good build\r
+ """\r
+ bn = self.get_last_good_buildnumber()\r
+ return self.get_build( bn )\r
+\r
+ def get_last_build( self ):\r
+ """\r
+ Get the last good build\r
+ """\r
+ bn = self.get_last_buildnumber()\r
+ return self.get_build( bn )\r
+\r
+ def get_last_completed_build( self ):\r
+ """\r
+ Get the last build regardless of status\r
+ """\r
+ bn = self.get_last_completed_buildnumber()\r
+ return self.get_build( bn )\r
+\r
+ def get_build( self, buildnumber ):\r
+ assert type(buildnumber) == int\r
+ url = self.get_build_dict()[ buildnumber ]\r
+ return build( url, buildnumber, job=self )\r
+\r
+ def __getitem__( self, buildnumber ):\r
+ return self.get_build(buildnumber)\r
+\r
+ def is_queued_or_running(self):\r
+ return self.is_queued() or self.is_running()\r
+\r
+ def is_queued(self):\r
+ self.poll()\r
+ return self._data["inQueue"]\r
+\r
+ def is_running(self):\r
+ self.poll()\r
+ try:\r
+ return self.get_last_build().is_running()\r
+ except NoBuildData:\r
+ log.info("No build info available for %s, assuming not running." % str(self) )\r
+ return False\r
--- /dev/null
+class result( object ):\r
+ def __init__(self, **kwargs ):\r
+ """\r
+\r
+ """\r
+ self.__dict__.update( kwargs )\r
+\r
+ def __str__(self):\r
+ return "%s %s %s" % ( self.className, self.name, self.status )\r
+\r
+ def __repr__(self):\r
+ module_name = self.__class__.__module__\r
+ class_name = self.__class__.__name__\r
+ self_str = str( self )\r
+ return "<%s.%s %s>" % ( module_name , class_name , self_str )\r
+\r
+ def id(self):\r
+ """\r
+ Calculate an ID for this object.\r
+ """\r
+ return "%s.%s" % ( self.className, self.name )\r
--- /dev/null
+from pyjenkinsci.jenkinsobject import jenkinsobject\r
+from pyjenkinsci.result import result\r
+\r
+class result_set( jenkinsobject ):\r
+ """\r
+ Represents a result from a completed Hudson run.\r
+ """\r
+ def getHudsonObject(self):\r
+ return self.build.job.get_jenkins_obj()\r
+\r
+ def __init__(self, url, build ):\r
+ """\r
+ """\r
+ self.build = build\r
+ jenkinsobject.__init__( self, url )\r
+\r
+ def __str__(self):\r
+ return "Test Result for %s" % str( self.build )\r
+\r
+ def keys(self):\r
+ return [ a[0] for a in self.iteritems() ]\r
+\r
+ def items(self):\r
+ return [a for a in self.iteritems()]\r
+\r
+ def iteritems(self):\r
+ for suite in self._data.get("suites", [] ):\r
+ for case in suite["cases"]:\r
+ R = result( **case )\r
+ yield R.id(), R\r
+\r
+ for report_set in self._data.get( "childReports", [] ):\r
+ for suite in report_set["result"]["suites"]:\r
+ for case in suite["cases"]:\r
+ R = result( **case )\r
+ yield R.id(), R\r
+\r
+ def __len__(self):\r
+ return sum( 1 for x in self.iteritems() )\r
--- /dev/null
+from cStringIO import StringIO\r
+\r
+class bufwrapper( object ):\r
+ """\r
+ Basic buffer-wrapper - wraps up an output stream with a buffer.\r
+ """\r
+ def __init__( self, stream, buffer=None ):\r
+ self.stream = stream\r
+\r
+ assert hasattr( self.stream, "write" ), "%s does not support write" % repr(stream)\r
+\r
+ if buffer is None:\r
+ self.buf = StringIO()\r
+ else:\r
+ self.buf = buffer\r
+\r
+ def get_and_clear( self ):\r
+ """\r
+ Get the contents of the buffer and clear it.\r
+ """\r
+ old_buffer = self.buf\r
+ self.buf = StringIO()\r
+ return old_buffer.getvalue()\r
+\r
+ def flush( self ):\r
+ for item in [ self.stream, self.buf ]:\r
+ if hasattr( item, "flush" ) and callable( item.flush ):\r
+ item.flush()\r
+\r
+\r
+ def close(self):\r
+ self.stream.close()\r
+\r
+ def write(self, txt ):\r
+ self.stream.write(txt)\r
+ self.buf.write(txt)\r
+\r
+ def getvalue(self):\r
+ return self.buf.getvalue()\r
--- /dev/null
+import datetime\r
+\r
+MICROSECONDS_PER_SECOND = 1000000.0\r
+SECONDS_PER_DAY = 86400\r
+\r
+def timedelta_to_seconds( td ):\r
+ assert isinstance( td, datetime.timedelta )\r
+ seconds = 0.0\r
+\r
+ seconds += td.days * SECONDS_PER_DAY\r
+ seconds += td.seconds\r
+ seconds += td.microseconds / MICROSECONDS_PER_SECOND\r
+\r
+ return seconds\r
--- /dev/null
+"""\r
+Generate random IDs.\r
+"""\r
+import random\r
+\r
+ID_VALID = "abcdefghijklmnopqrstuvwxyz0123456789"\r
+\r
+def mk_id(length=5, prefix=""):\r
+ idchars = []\r
+ for count in range( 0, length ):\r
+ idchars.append( random.choice( ID_VALID ) )\r
+ return "%s%s" % ( prefix, "".join( idchars ) )\r
+\r
+if __name__ == "__main__":\r
+ for i in range(0, 50):\r
+ print repr( mk_id( i ) )\r
--- /dev/null
+import logging\r
+import datetime\r
+import traceback\r
+import sys\r
+\r
+try:\r
+ from xml.etree import ElementTree as ET\r
+except Exception, e:\r
+ import elementtree.ElementTree as ET\r
+\r
+from pyjenkinsci.utils.dates import timedelta_to_seconds\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class junitxml( object ):\r
+\r
+ ERROR = "error"\r
+ FAILURE = "failure"\r
+\r
+ def __init__( self, stream, testsuite_name="test", ):\r
+ """\r
+ Set up a new stream\r
+ """\r
+ assert isinstance( testsuite_name, str )\r
+\r
+ self.xml = ET.Element("testsuite")\r
+ self.stream = stream\r
+\r
+ self.xml.attrib["name"] = testsuite_name\r
+\r
+ self.count_errors = 0\r
+ self.count_tests = 0\r
+ self.count_failures = 0\r
+\r
+ def __repr__(self):\r
+ return "<%s.%s %s>" % (self.__class__.__module__, self.__class__.__name__, str(self))\r
+\r
+ def __str__(self):\r
+ return "Stream: %s, Tests: %i Errors: %i, Failures %i" % ( repr( self.stream ),\r
+ self.count_tests,\r
+ self.count_errors,\r
+ self.count_failures )\r
+\r
+ @classmethod\r
+ def get_error_strings( cls, e ):\r
+ str_error_type = "%s.%s" % ( e.__class__.__module__, e.__class__.__name__ )\r
+ str_error_args = ",".join( [repr(ee) for ee in e.args] )\r
+ str_doc = str( e.__doc__ ).strip()\r
+\r
+ return str_error_type, str_error_args, str_doc\r
+\r
+ def write(self, xml_declaration=True, encoding="utf-8"):\r
+ self.xml.attrib["errors"] = str( self.count_errors )\r
+ self.xml.attrib["failures"] = str( self.count_failures )\r
+ self.xml.attrib["tests"] = str( self.count_tests )\r
+\r
+ ET.ElementTree( self.xml ).write( self.stream, encoding=encoding, xml_declaration=xml_declaration )\r
+ log.warn( "Wrote Junit-style XML log to %s" % self.stream )\r
+\r
+ def assertTrue(self, classname, testname, errmsg, fn, *args, **kwargs ):\r
+ """\r
+ Map the interface onto an assert like statement.\r
+ Also returns the value so that we can do useful things with the result\r
+ """\r
+\r
+ _testname = testname.replace( ".", "_") # Dots are not permitted in names'\r
+\r
+ def assert_fn( ):\r
+ if callable(fn):\r
+ assert fn( *args, **kwargs ), errmsg\r
+ else:\r
+ assert len(args) == 0 and len(kwargs) == 0, "Object being tested is not callable and cannot have arguments."\r
+ assert fn, "errmsg"\r
+\r
+ tr = self.startTest(classname, _testname)\r
+ return tr.run( assert_fn )\r
+\r
+ def startTest( self, classname, testname, ):\r
+ return junitxml_transaction( self, classname, testname )\r
+\r
+ def passTest( self, classname, name, test_time ):\r
+ self.addPass( classname, name, test_time)\r
+\r
+ def failTest(self, classname, name, test_time, error, tb, mode=FAILURE ):\r
+ """\r
+ Add a error\r
+ """\r
+ str_error, str_error_msg, str_doc = self.get_error_strings( error )\r
+ enhanced_tb = "%s: %s\n\n( %s )\n\n%s" % ( repr(error), str_error_msg, str_doc, tb )\r
+ tc = self.addPass( classname, name, test_time)\r
+ self.convertPassToFail( tc, str_error, enhanced_tb, mode=mode )\r
+\r
+\r
+ def addPass(self, classname, name, test_time=0.0, ):\r
+ """\r
+ Add a pass\r
+ """\r
+ assert isinstance( classname, str )\r
+ assert isinstance( name, str )\r
+ assert isinstance( test_time, (int, float) )\r
+ self.count_tests += 1\r
+ testcase = ET.SubElement( self.xml, "testcase" )\r
+ testcase.attrib["classname"] = classname\r
+ testcase.attrib["name"] = name\r
+ testcase.attrib["time"] = "%.2f" % test_time\r
+\r
+ return testcase\r
+\r
+ def convertPassToFail( self, tc, failtype="", tb="", mode=FAILURE ):\r
+ """\r
+ Add a failure\r
+ """\r
+ assert isinstance( failtype, str )\r
+ assert isinstance( tb, str ), "Traceback should be a string, got %s" % repr(tb)\r
+ assert mode in [ self.FAILURE, self.ERROR ]\r
+\r
+ if mode == self.FAILURE:\r
+ self.count_errors += 1\r
+ else:\r
+ self.count_failures += 1\r
+\r
+ failure = ET.SubElement( tc, mode )\r
+ failure.text = tb\r
+ failure.attrib["type"] = failtype\r
+ return failure\r
+\r
+\r
+class junitxml_transaction( object ):\r
+ def __init__(self, jxml, classname, testname ):\r
+ assert isinstance( jxml, junitxml )\r
+ self.jxml = jxml\r
+ self.classname = classname\r
+ self.testname = testname\r
+ self.start_time = datetime.datetime.now()\r
+\r
+ def getRuntime(self):\r
+ return timedelta_to_seconds( datetime.datetime.now() - self.start_time )\r
+\r
+ def run( self, fn, *args, **kwargs ):\r
+ try:\r
+ result = fn( *args, **kwargs )\r
+ self.jxml.addPass( self.classname, self.testname, self.getRuntime() )\r
+ except Exception, e:\r
+ ex_type, ex_value, ex_tb = sys.exc_info()\r
+\r
+ tb_formatted = traceback.format_exception( ex_type, ex_value, ex_tb )\r
+ str_tb = "\n".join( tb_formatted )\r
+ str_ex = "%s.%s" % ( ex_value.__class__.__module__, ex_value.__class__.__name__ )\r
+ runtime = self.getRuntime()\r
+\r
+ if isinstance(e, AssertionError):\r
+ self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.FAILURE )\r
+ else:\r
+ self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.ERROR )\r
+\r
+ log.exception(e)\r
+\r
+ raise e\r
+ return result\r
+\r
+if __name__ == "__main__":\r
+ import sys\r
+ import time\r
+ import random\r
+\r
+ logging.basicConfig()\r
+ logging.getLogger("").setLevel( logging.INFO )\r
+ fod = junitxml( stream=sys.stdout )\r
+\r
+ def fn_test( mode ):\r
+\r
+ time.sleep( random.random( ) )\r
+\r
+ if mode=="pass":\r
+ return 1\r
+ elif mode=="fail":\r
+ assert False\r
+ elif mode=="error":\r
+ {}["x"]\r
+\r
+ for testname in [ "pass", "fail", "error" ]:\r
+ t = fod.startTest("a", testname, )\r
+ try:\r
+ t.run( fn_test, testname )\r
+ except Exception, e:\r
+ #log.exception(e)\r
+ pass\r
+\r
+ fod.write()\r
--- /dev/null
+try:\r
+ import hashlib\r
+except ImportError:\r
+ import md5\r
+\r
+\r
+def new_digest():\r
+ if hashlib:\r
+ m = hashlib.md5()\r
+ else:\r
+ m = md5.new()\r
+ return m\r
+\r
+if __name__ == "__main__":\r
+ x = new_digest()\r
+ x.update("123")\r
+ print repr( x.digest() )\r
--- /dev/null
+import logging\r
+import time\r
+\r
+log = logging.getLogger( __name__ )\r
+\r
+IGNORE_EXCEPTIONS = [ AttributeError, KeyboardInterrupt ]\r
+\r
+DEFAULT_SLEEP_TIME = 1\r
+\r
+def retry_function( tries, fn, *args, **kwargs ):\r
+ """\r
+ Retry function - calls an unreliable function n times before giving up, if tries is exceeded\r
+ and it still fails the most recent exception is raised.\r
+ """\r
+ assert isinstance( tries, int ), "Tries should be a non-zero positive integer"\r
+ assert tries > 0, "Tries should be a non-zero positive integer"\r
+ for attempt in range(0, tries):\r
+ attemptno = attempt + 1\r
+ if attemptno == tries:\r
+ log.warn( "Last chance: #%i of %i" % ( attemptno, tries ) )\r
+ elif tries > attempt > 0:\r
+ log.warn( "Attempt #%i of %i" % ( attemptno, tries ) )\r
+ try:\r
+ result = fn( *args, **kwargs )\r
+ if attempt > 0:\r
+ log.info( "Result obtained after attempt %i" % attemptno )\r
+ return result\r
+ except Exception, e:\r
+ if type(e) in IGNORE_EXCEPTIONS:\r
+ # Immediatly raise in some cases.\r
+ raise\r
+ try:\r
+ fn_name = fn.__name__\r
+ except AttributeError, ae:\r
+ fn_name = "Anonymous Function"\r
+ log.exception(e)\r
+ log.warn( "%s failed at attempt %i, trying again." % ( fn_name , attemptno ) )\r
+ time.sleep( DEFAULT_SLEEP_TIME )\r
+ raise e\r
+\r
+if __name__ == "__main__":\r
+\r
+ def broken_function( a ):\r
+ return {}[a]\r
+\r
+ logging.basicConfig()\r
+\r
+ try:\r
+ retry_function( 3, broken_function, "x" )\r
+ except Exception, e:\r
+ print repr(e)\r
--- /dev/null
+import urllib2\r
+\r
+import logging\r
+\r
+log = logging.getLogger( __name__ )\r
+\r
+DEFAULT_PROXYPORT = 80\r
+DEFAULT_PROXY_PASS = "Research123"\r
+DEFAULT_PROXY_USER = "wsa_oblicqs_dev"\r
+\r
+def mkurlopener( proxyhost, proxyport, proxyuser, proxypass ):\r
+ if not proxyhost:\r
+ return urllib2.urlopen\r
+ else:\r
+ if proxyport is None:\r
+ proxyport = DEFAULT_PROXYPORT\r
+\r
+ if proxypass is None:\r
+ proxypass = DEFAULT_PROXY_PASS\r
+\r
+ if proxyuser is None:\r
+ proxyuser = DEFAULT_PROXY_USER\r
+\r
+ assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )\r
+ assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )\r
+ assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )\r
+\r
+ proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),\r
+ 'https': 'http://%s:%i/' % (proxyhost, proxyport) }\r
+\r
+ proxy_handler = urllib2.ProxyHandler( proxy_spec )\r
+ proxy_auth_handler = urllib2.HTTPBasicAuthHandler()\r
+ proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )\r
+\r
+ opener = urllib2.build_opener(proxy_handler, proxy_auth_handler)\r
+\r
+ return opener.open\r
--- /dev/null
+"""
+XML Test Runner for PyUnit
+"""
+
+# Written by Sebastian Rittau <srittau@jroger.in-berlin.de> and placed in
+# the Public Domain.
+
+__revision__ = "$Id: /mirror/jroger/python/stdlib/xmlrunner.py 3506 2006-07-27T09:12:39.629878Z srittau $"
+
+import sys
+import time
+import traceback
+import unittest
+import logging
+from StringIO import StringIO
+from xml.sax.saxutils import escape
+
+log = logging.getLogger()
+
+from pyjenkinsci.utils.bufwrapper import bufwrapper
+
+class faketest( object ):
+ """
+ A fake test object for when you want to inject additional results into the XML stream.
+ """
+ failureException = AssertionError
+
+ def __init__( self, id, exc_info ):
+ self._id = id
+ self._exc_info = exc_info
+
+ def id(self):
+ return self._id
+
+ def run(self, result):
+ result.startTest(self)
+ result.addError(self, self._exc_info )
+ ok = False
+ result.stopTest(self)
+
+ def __call__(self, *args, **kwds):
+ return self.run(*args, **kwds)
+
+
+class _TestInfo(object):
+ """Information about a particular test.
+ Used by _XmlTestResult."""
+
+ def __init__( self, test, time, ):
+ (self._class, self._method) = test.id().rsplit(".", 1)
+ self._time = time
+ self._error = None
+ self._failure = None
+ self._console = ""
+
+ @staticmethod
+ def create_success(test, time):
+ """Create a _TestInfo instance for a successful test."""
+ return _TestInfo(test, time)
+
+ @staticmethod
+ def create_failure(test, time, failure, console=""):
+ """Create a _TestInfo instance for a failed test."""
+ info = _TestInfo(test, time)
+ info._failure = failure
+ info.console = console
+ return info
+
+ @staticmethod
+ def create_error(test, time, error, console="" ):
+ """Create a _TestInfo instance for an erroneous test."""
+ info = _TestInfo(test, time)
+ info._error = error
+ info.console = console
+ return info
+
+ def print_report(self, stream):
+ """Print information about this test case in XML format to the
+ supplied stream.
+ """
+ stream.write(' <testcase classname="%(class)s" name="%(method)s" time="%(time).4f">' % \
+ {
+ "class": self._class,
+ "method": self._method,
+ "time": self._time,
+ })
+ if self._failure is not None:
+ self._print_error(stream, 'failure', self._failure)
+ if self._error is not None:
+ self._print_error(stream, 'error', self._error)
+ stream.write('</testcase>\n')
+
+ def _print_error(self, stream, tagname, error):
+ """Print information from a failure or error to the supplied stream."""
+ text = escape(str(error[1]))
+ stream.write('\n')
+ stream.write(' <%s type="%s">%s\n%s\n' \
+ % (tagname, str(error[0]), text, self.console ))
+ tb_stream = StringIO()
+ traceback.print_tb(error[2], None, tb_stream)
+ stream.write(escape(tb_stream.getvalue()))
+ stream.write(' </%s>\n' % tagname)
+ stream.write(' ')
+
+
+class _XmlTestResult(unittest.TestResult):
+ """A test result class that stores result as XML.
+
+ Used by XmlTestRunner.
+ """
+
+ test_count = 0
+
+ @classmethod
+ def get_test_serial( cls ):
+ cls.test_count += 1
+ return cls.test_count
+
+ def __init__(self, classname, consolestream =None ):
+ unittest.TestResult.__init__(self)
+ self._test_name = classname
+ self._start_time = None
+ self._tests = []
+ self._error = None
+ self._failure = None
+ self._consolestream = consolestream
+
+ def startTest(self, test):
+ unittest.TestResult.startTest(self, test)
+
+ sn = self.get_test_serial()
+
+ log.info( "Test %i: %s" % ( sn, test.id() ) )
+ self._error = None
+ self._failure = None
+ self._start_time = time.time()
+
+ def stopTest(self, test, time_taken = None ):
+ if time_taken is not None:
+ time_taken = time.time() - self._start_time
+
+ str_console = self._consolestream.get_and_clear()
+
+ unittest.TestResult.stopTest(self, test)
+ if self._error:
+ info = _TestInfo.create_error(test, time_taken, self._error, console=str_console )
+ log.error( "Error: %s" % test.id() )
+ elif self._failure:
+ info = _TestInfo.create_failure(test, time_taken, self._failure, console=str_console )
+ log.error( "Fail: %s" % test.id() )
+ else:
+ info = _TestInfo.create_success(test, time_taken, )
+ log.debug( "OK: %s" % test.id() )
+ self._tests.append(info)
+
+ def addError(self, test, err):
+ log.warn( "Error: %s" % test.id() )
+ unittest.TestResult.addError(self, test, err)
+ self._error = err
+
+ def addFailure(self, test, err):
+ log.warn( "Failure: %s" % test.id() )
+ unittest.TestResult.addFailure(self, test, err)
+ self._failure = err
+
+ def print_report(self, stream, time_taken, out, err):
+ """Prints the XML report to the supplied stream.
+
+ The time the tests took to perform as well as the captured standard
+ output and standard error streams must be passed in.
+ """
+ stream.write('<testsuite errors="%(e)d" failures="%(f)d" ' % \
+ { "e": len(self.errors), "f": len(self.failures) })
+ stream.write('name="%(n)s" tests="%(t)d" time="%(time).3f">\n' % \
+ {
+ "n": self._test_name,
+ "t": self.testsRun,
+ "time": time_taken,
+ })
+ for info in self._tests:
+ info.print_report(stream)
+ stream.write(' <system-out><![CDATA[%s]]></system-out>\n' % out)
+ stream.write(' <system-err><![CDATA[%s]]></system-err>\n' % err)
+ stream.write('</testsuite>\n')
+
+
+class XmlTestRunner(object):
+ """A test runner that stores results in XML format compatible with JUnit.
+
+ XmlTestRunner(stream=None) -> XML test runner
+
+ The XML file is written to the supplied stream. If stream is None, the
+ results are stored in a file called TEST-<module>.<class>.xml in the
+ current working directory (if not overridden with the path property),
+ where <module> and <class> are the module and class name of the test class.
+ """
+ def __init__(self, stream=None ):
+ self._stream = stream
+
+ @staticmethod
+ def get_test_class_name_from_testobj( obj_test ):
+ class_ = obj_test.__class__
+ classname = class_.__module__ + "." + class_.__name__
+ return classname
+
+
+ def run(self, test, result=None ):
+ """Run the given test case or test suite."""
+ classname = self.get_test_class_name_from_testobj( test )
+ assert not self._stream is None
+ stream = self._stream
+
+ # TODO: Python 2.5: Use the with statement
+ old_stdout = sys.stdout
+ old_stderr = sys.stderr
+ sys.stdout = bufwrapper( old_stdout )
+ sys.stderr = bufwrapper( old_stderr )
+
+ if result is None:
+ result = _XmlTestResult( classname, consolestream = sys.stdout )
+ else:
+ log.debug("Using provided XML test result object.")
+
+ start_time = time.time()
+
+ try:
+ test(result)
+ try:
+ out_s = sys.stdout.getvalue()
+ except AttributeError:
+ out_s = ""
+ try:
+ err_s = sys.stderr.getvalue()
+ except AttributeError:
+ err_s = ""
+ finally:
+ sys.stdout = old_stdout
+ sys.stderr = old_stderr
+
+ time_taken = time.time() - start_time
+ result.print_report(stream, time_taken, out_s, err_s)
+ if self._stream is None:
+ stream.close()
+
+ return result
--- /dev/null
+import unittest\r
+import sys\r
+import re\r
+from cStringIO import StringIO\r
+from pyjenkinsci.utils.xmlrunner import XmlTestRunner\r
+\r
+class XmlTestRunnerTest(unittest.TestCase):\r
+ def setUp(self):\r
+ self._stream = StringIO()\r
+\r
+ def _try_test_run(self, test_class, expected):\r
+ """Run the test suite against the supplied test class and compare the\r
+ XML result against the expected XML string. Fail if the expected\r
+ string doesn't match the actual string. All time attribute in the\r
+ expected string should have the value "0.000". All error and failure\r
+ messages are reduced to "Foobar".\r
+ """\r
+ runner = XmlTestRunner(self._stream)\r
+ runner.run(unittest.makeSuite(test_class))\r
+\r
+ got = self._stream.getvalue()\r
+ # Replace all time="X.YYY" attributes by time="0.000" to enable a\r
+ # simple string comparison.\r
+ got = re.sub(r'time="\d+\.\d+"', 'time="0.000"', got)\r
+ # Likewise, replace all failure and error messages by a simple "Foobar"\r
+ # string.\r
+ got = re.sub(r'(?s)<failure (.*?)>.*?</failure>', r'<failure \1>Foobar</failure>', got)\r
+ got = re.sub(r'(?s)<error (.*?)>.*?</error>', r'<error \1>Foobar</error>', got)\r
+\r
+ self.assertEqual(expected, got)\r
+\r
+ def test_no_tests(self):\r
+ """Regression test: Check whether a test run without any tests\r
+ matches a previous run."""\r
+ class TestTest(unittest.TestCase):\r
+ pass\r
+ self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="0" time="0.000">\r
+ <system-out><![CDATA[]]></system-out>\r
+ <system-err><![CDATA[]]></system-err>\r
+</testsuite>\r
+""")\r
+\r
+ def test_success(self):\r
+ """Regression test: Check whether a test run with a successful test\r
+ matches a previous run."""\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ pass\r
+ self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
+ <testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>\r
+ <system-out><![CDATA[]]></system-out>\r
+ <system-err><![CDATA[]]></system-err>\r
+</testsuite>\r
+""")\r
+\r
+ def test_failure(self):\r
+ """Regression test: Check whether a test run with a failing test\r
+ matches a previous run."""\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ self.assert_(False)\r
+ self._try_test_run(TestTest, """<testsuite errors="0" failures="1" name="unittest.TestSuite" tests="1" time="0.000">\r
+ <testcase classname="__main__.TestTest" name="test_foo" time="0.000">\r
+ <failure type="exceptions.AssertionError">Foobar</failure>\r
+ </testcase>\r
+ <system-out><![CDATA[]]></system-out>\r
+ <system-err><![CDATA[]]></system-err>\r
+</testsuite>\r
+""")\r
+\r
+ def test_error(self):\r
+ """Regression test: Check whether a test run with a erroneous test\r
+ matches a previous run."""\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ raise IndexError()\r
+ self._try_test_run(TestTest, """<testsuite errors="1" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
+ <testcase classname="__main__.TestTest" name="test_foo" time="0.000">\r
+ <error type="exceptions.IndexError">Foobar</error>\r
+ </testcase>\r
+ <system-out><![CDATA[]]></system-out>\r
+ <system-err><![CDATA[]]></system-err>\r
+</testsuite>\r
+""")\r
+\r
+ def test_stdout_capture(self):\r
+ """Regression test: Check whether a test run with output to stdout\r
+ matches a previous run."""\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ print "Test"\r
+ self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
+ <testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>\r
+ <system-out><![CDATA[Test\r
+]]></system-out>\r
+ <system-err><![CDATA[]]></system-err>\r
+</testsuite>\r
+""")\r
+\r
+ def test_stderr_capture(self):\r
+ """Regression test: Check whether a test run with output to stderr\r
+ matches a previous run."""\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ print >>sys.stderr, "Test"\r
+ self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
+ <testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>\r
+ <system-out><![CDATA[]]></system-out>\r
+ <system-err><![CDATA[Test\r
+]]></system-err>\r
+</testsuite>\r
+""")\r
+\r
+ class NullStream(object):\r
+ """A file-like object that discards everything written to it."""\r
+ def write(self, buffer):\r
+ pass\r
+\r
+ def test_unittests_changing_stdout(self):\r
+ """Check whether the XmlTestRunner recovers gracefully from unit tests\r
+ that change stdout, but don't change it back properly.\r
+ """\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ sys.stdout = XmlTestRunnerTest.NullStream()\r
+\r
+ runner = XmlTestRunner(self._stream)\r
+ runner.run(unittest.makeSuite(TestTest))\r
+\r
+ def test_unittests_changing_stderr(self):\r
+ """Check whether the XmlTestRunner recovers gracefully from unit tests\r
+ that change stderr, but don't change it back properly.\r
+ """\r
+ class TestTest(unittest.TestCase):\r
+ def test_foo(self):\r
+ sys.stderr = XmlTestRunnerTest.NullStream()\r
+\r
+ runner = XmlTestRunner(self._stream)\r
+ runner.run(unittest.makeSuite(TestTest))\r
+\r
+\r
+if __name__ == "__main__":\r
+ suite = unittest.makeSuite(XmlTestRunnerTest)\r
+ unittest.TextTestRunner().run(suite)\r
--- /dev/null
+from pyjenkinsci.jenkinsobject import jenkinsobject\r
+from pyjenkinsci.job import job\r
+\r
+class view( jenkinsobject ):\r
+\r
+ def __init__(self, url, name, jenkins_obj):\r
+ self.name = name\r
+ self.jenkins_obj = jenkins_obj\r
+ jenkinsobject.__init__(self, url)\r
+\r
+ def __str__(self):\r
+ return self.name\r
+\r
+ def __getitem__(self, str_job_id ):\r
+ assert isinstance( str_job_id, str )\r
+ api_url = self.python_api_url( self.get_job_url( str_job_id ) )\r
+ return job( api_url, str_job_id, self.jenkins_obj )\r
+\r
+ def keys(self):\r
+ return self.get_job_dict().keys()\r
+\r
+ def iteritems(self):\r
+ for name, url in self.get_job_dict().iteritems():\r
+ api_url = self.python_api_url( url )\r
+ yield name, job( api_url, name, self.jenkins_obj )\r
+\r
+ def values(self):\r
+ return [ a[1] for a in self.iteritems() ]\r
+\r
+ def items(self):\r
+ return [ a for a in self.iteritems() ]\r
+\r
+ def _get_jobs( self ):\r
+ if not self._data.has_key( "jobs" ):\r
+ pass\r
+ else:\r
+ for viewdict in self._data["jobs"]:\r
+ yield viewdict["name"], viewdict["url"]\r
+\r
+ def get_job_dict(self):\r
+ return dict( self._get_jobs() )\r
+\r
+ def __len__(self):\r
+ return len( self.get_job_dict().keys() )\r
+\r
+ def get_job_url( self, str_job_name ):\r
+ try:\r
+ job_dict = self.get_job_dict()\r
+ return job_dict[ str_job_name ]\r
+ except KeyError, ke:\r
+ all_views = ", ".join( job_dict.keys() )\r
+ raise KeyError("Job %s is not known - available: %s" % ( str_job_name, all_views ) )\r
+\r
+ def get_jenkins_obj(self):\r
+ return self.jenkins_obj\r
+\r
+ def id(self):\r
+ """\r
+ Calculate an ID for this object.\r
+ """\r
+ return "%s.%s" % ( self.className, self.name )
\ No newline at end of file
--- /dev/null
+import os\r
+\r
+#Disable HTTP PROXY\r
+CLEAR_PROXY = os.environ.get("CLEAR_PROXY","")\r
+if len( CLEAR_PROXY ) > 0:\r
+ del os.environ["HTTP_PROXY"]\r
+\r
+JENKINS_BASE = os.environ.get( "JENKINS_BASE", "http://localhost:8080/jenkins" )\r
+HTTP_PROXY = os.environ.get( "HTTP_PROXY", "" )\r
+BUILD_NAME_TEST1 = "test1"\r
+\r
+if __name__ == "__main__":\r
+ print( "Jenkins base: %s" % JENKINS_BASE )\r
+ print( "Http Proxy: %s" %HTTP_PROXY )
\ No newline at end of file
--- /dev/null
+"""\r
+Important: For this test to work we need at least one Jenkins server\r
+You need to configure the JENKINS_BASE environment variable\r
+And you need to enure that this Jenkins has at least one job called "test1".\r
+Make sure that sucsessful builds of test one archive an artifact called "test1.txt" - it can be anything.\r
+"""\r
+import unittest\r
+import logging\r
+\r
+from pyjenkinsci.build import build\r
+from pyjenkinsci.result_set import result_set\r
+from pyjenkinsci.result import result\r
+from pyjenkinsci import api\r
+from pyjenkinsci_tests.config import JENKINS_BASE, BUILD_NAME_TEST1\r
+\r
+if __name__ == "__main__":\r
+ logging.basicConfig()\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class test_api( unittest.TestCase ):\r
+ """\r
+ Perform a number of basic queries.\r
+ """\r
+\r
+ def setUp(self):\r
+ pass\r
+\r
+ def test_get_latest_build_results(self):\r
+ lb = api.get_latest_build(JENKINS_BASE, BUILD_NAME_TEST1)\r
+ assert isinstance(lb, build)\r
+ rs = lb.get_resultset()\r
+ assert isinstance( rs, result_set )\r
+ assert len(rs) > 0\r
+\r
+ for id, res in rs.items():\r
+ assert isinstance( res, result ), "Expected result-set object, got %s" % repr(res)\r
+\r
+\r
+if __name__ == "__main__":\r
+ unittest.main()\r
--- /dev/null
+"""\r
+Important: For this test to work we need at least one Jenkins server\r
+You need to configure the JENKINS_BASE environment variable\r
+And you need to enure that this Jenkins has at least one job called "test1".\r
+Make sure that sucsessful builds of test one archive an artifact called "test1.txt" - it can be anything.\r
+"""\r
+import unittest\r
+import logging\r
+\r
+from pyjenkinsci.jenkins import jenkins\r
+from pyjenkinsci.artifact import artifact\r
+from pyjenkinsci.build import build\r
+from pyjenkinsci_tests.config import HTTP_PROXY, JENKINS_BASE\r
+\r
+if __name__ == "__main__":\r
+ logging.basicConfig()\r
+\r
+log = logging.getLogger(__name__)\r
+\r
+class test_query( unittest.TestCase ):\r
+ """\r
+ Perform a number of basic queries.\r
+ """\r
+\r
+ def setUp(self):\r
+ log.warn("Connecting to %s via proxy: %s" % (JENKINS_BASE, HTTP_PROXY) )\r
+ self.jenkins = jenkins( JENKINS_BASE )\r
+\r
+ def testListJobs(self):\r
+ """\r
+ Test that we can get a list of jobs\r
+ """\r
+ job_ids = self.jenkins.keys()\r
+ assert "test1" in job_ids\r
+\r
+ def testListBuilds(self):\r
+ """\r
+ """\r
+ test1 = self.jenkins["test1"]\r
+ builds = [a for a in test1.get_build_ids() ]\r
+ assert len(builds) > 0\r
+ newest_build = test1[ builds[-1] ]\r
+ assert isinstance( newest_build, build )\r
+\r
+ def testGetLatestArtifact(self):\r
+ test1 = self.jenkins["test1"]\r
+ builds = [a for a in test1.get_build_ids() ]\r
+ assert len(builds) > 0\r
+ newest_build = test1[ builds[0] ]\r
+ assert isinstance( newest_build, build )\r
+ artifact_dict = newest_build.get_artifact_dict()\r
+ assert "test1.txt" in artifact_dict.keys()\r
+ test_artifact = artifact_dict[ "test1.txt" ]\r
+ assert isinstance( test_artifact, artifact )\r
+\r
+\r
+if __name__ == "__main__":\r
+ unittest.main()\r
--- /dev/null
+cd /D %~dp0\r
+cd pyjenkinsci_egg\r
+python setup.py develop -m\r