From 35ceb6319a55145755ded33169899c4134b5a2d2 Mon Sep 17 00:00:00 2001 From: Ramon van Alteren Date: Thu, 5 Jan 2012 12:44:35 +0100 Subject: [PATCH] First stab at refactoring artifact Removed weird streaming code, replaced it with a call to urllib.urlretrieve which does chunking all by itself Made local md5 hash generation more sane Use fingerprint object directly for validation, without going through the jenkins api ref --- pyjenkinsci/artifact.py | 173 ++++++++++-------------------------------------- pyjenkinsci/jenkins.py | 18 +++-- 2 files changed, 48 insertions(+), 143 deletions(-) diff --git a/pyjenkinsci/artifact.py b/pyjenkinsci/artifact.py index 9d8383f..2b53ecd 100644 --- a/pyjenkinsci/artifact.py +++ b/pyjenkinsci/artifact.py @@ -1,174 +1,69 @@ -import urllib2 +from __future__ import with_statement +import urllib import os import logging -import cStringIO -import zipfile -import cPickle -import datetime import hashlib -from pyjenkinsci import config -from pyjenkinsci.utils.retry import retry_function from pyjenkinsci.exceptions import ArtifactBroken +from pyjenkinsci.fingerprint import Fingerprint log = logging.getLogger( __name__ ) class Artifact(object): - @staticmethod - def timedelta_to_seconds( td ): - secs = float( td.seconds ) - secs += td.microseconds / 1000000.0 - secs += td.days * 86400 - return secs - def __init__( self, filename, url, build=None ): self.filename = filename self.url = url self.build = build - def unpickle(self, method="pickle" ): - """ - Assume that the object is a pickled stream. - """ - stream, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream ) - - while True: - try: - yield cPickle.load( stream ) - except EOFError: - break - - def logging_buffer_copy( self, input_stream, output_stream, length, chunks=10 ): - - chunk_points = int( length / chunks ) - - start_time = datetime.datetime.now() - last_time = datetime.datetime.now() - - for index in xrange( 0, length ): - output_stream.write( input_stream.read(1) ) - - if chunk_points > 0: - if ( index % chunk_points ) == 0 and ( index > 0 ): - now = datetime.datetime.now() - - try: - time_elapsed_since_start = self.timedelta_to_seconds( now - start_time ) - # avg_bitrate = ( index / time_elapsed_since_start ) / 1024.0 - time_elapsed_since_last_chunk = self.timedelta_to_seconds( now - last_time ) - inst_bitrate = ( chunk_points / time_elapsed_since_last_chunk ) / 1024.0 - except ZeroDivisionError, _: - continue - - log.info( "Loaded %i of %i bytes %.2f kbit/s" % ( index, length, inst_bitrate ) ) - last_time = now - - def getstream( self ): - """ - Get the artifact as a stream - """ - artifact_digest = hashlib.md5() - tmp_buffer = cStringIO.StringIO() - - if self.build: - fn_opener = self.build.job.jenkins.get_opener() - else: - fn_opener = urllib2.urlopen - - try: - inputstream = fn_opener( self.url, ) - content_type = inputstream.info().get("content-type", "unknown") - - try: - content_length = int( inputstream.info()["content-length"] ) - self.logging_buffer_copy( inputstream, tmp_buffer, content_length ) - except KeyError, ke: - # Could not get length. - log.warn("Could not get length") - tmp_buffer.write( inputstream.read() ) - - except urllib2.HTTPError: - log.warn( "Error fetching %s" % self.url ) - raise - tmp_buffer.seek(0) - - artifact_digest.update(tmp_buffer.getvalue()) - artifact_hexdigest = artifact_digest.hexdigest() - - artifact_size = len(tmp_buffer.getvalue()) - log.info( "Got %s, %i bytes, MD5: %s, type: %s" % ( self.filename, artifact_size, artifact_hexdigest, content_type ) ) - - if self.build: - self.build.job.jenkins.validate_fingerprint( artifact_hexdigest ) - - return tmp_buffer, artifact_hexdigest - - def openzip( self ): - """ - Open the artifact as a zipfile. - """ - buffer, _ = retry_function( config.LOAD_ATTEMPTS , self.getstream ) - zf = zipfile.ZipFile( buffer, "r" ) - return zf - def save( self, fspath ): """ Save the artifact to an explicit path. The containing directory must exist. Returns a reference to the file which has just been writen to. - """ + :param fspath: full pathname including the filename, str + :return: filepath + """ log.info( "Saving artifact @ %s to %s" % (self.url, fspath) ) - if not fspath.endswith( self.filename ): log.warn( "Attempt to change the filename of artifact %s on save." % self.filename ) - - if os.path.exists( fspath ): - existing_hexdigest = self.get_local_digest( fspath ) + if os.path.exists(fspath): if self.build: try: - valid = self.build.job.jenkins.validate_fingerprint_for_build( existing_hexdigest, filename=self.filename, job=self.build.job.id(), build=self.build.id() ) - - if valid: - log.info( "Local copy of %s is already up to date. MD5 %s" % (self.filename, existing_hexdigest) ) - else: - self.__do_download( fspath ) - except ArtifactBroken, ab: #@UnusedVariable + if self._verify_download(fspath): + log.info( "Local copy of %s is already up to date." % self.filename) + return fspath + except ArtifactBroken: log.info("Jenkins artifact could not be identified.") else: log.info("This file did not originate from Jenkins, so cannot check.") - self.__do_download( fspath ) else: log.info("Local file is missing, downloading new.") - self.__do_download( fspath ) - - def get_local_digest( self, fspath ): - tmp_buffer_existing = cStringIO.StringIO() - existingfile = open( fspath, "rb" ) - tmp_buffer_existing.write( existingfile.read() ) - existing_digest = hashlib.md5() - existing_digest.update(tmp_buffer_existing.getvalue()) - existing_hexdigest = existing_digest.hexdigest() - return existing_hexdigest - - def __do_download( self, fspath ): - - filedir, _ = os.path.split( fspath ) - if not os.path.exists( filedir ): - log.warn( "Making missing directory %s" % filedir ) - os.makedirs( filedir ) - + filename = self._do_download(fspath) + try: + self._verify_download(filename) + except ArtifactBroken: + log.warning("fingerprint of the downloaded artifact could not be verified") + return filename + + def _do_download(self, fspath): + filename, headers = urllib.urlretrieve(self.url, filename=fspath) + return filename + + def _verify_download(self, fspath): + local_md5 = self._md5sum(fspath) + fp = Fingerprint(self.build.job.jenkins.baseurl, local_md5, self.build.job.jenkins) + return fp.validate_for_build(fspath, self.build.job, self.build) + + def _md5sum(self, fspath, chunksize=2**20): + md5 = hashlib.md5() try: - outputfile = open( fspath, "wb" ) - except IOError, ioe: - log.critical("User %s@%s cannot open file" % ( os.environ.get("USERNAME","unknown"),os.environ.get("USERDOMAIN","unknown") ) ) + with open(fspath,'rb') as f: + for chunk in iter(lambda: f.read(chunksize), ''): + md5.update(chunk) + except: raise - - tmp_buffer_downloaded, artifact_hexdigest = retry_function( config.LOAD_ATTEMPTS , self.getstream ) - - outputfile.write( tmp_buffer_downloaded.getvalue() ) - return outputfile - + return md5.digest() def savetodir( self, dirpath ): """ diff --git a/pyjenkinsci/jenkins.py b/pyjenkinsci/jenkins.py index 1c34ef9..73e05f0 100644 --- a/pyjenkinsci/jenkins.py +++ b/pyjenkinsci/jenkins.py @@ -80,6 +80,14 @@ class Jenkins(JenkinsBase): for info in self._data["jobs"]: yield info["name"], Job( info["url"], info["name"], jenkins_obj=self) + def get_job(self, jobname): + """ + Get a job by name + :param jobname: name of the job, str + :return: Job obj + """ + return self['jobname'] + def iteritems(self): return self.get_jobs() @@ -116,14 +124,16 @@ class Jenkins(JenkinsBase): view_api_url = self.python_api_url( view_url ) return View(view_api_url , str_view_name, jenkins_obj=self) - def __getitem__( self, buildname ): + def __getitem__(self, jobname): """ - Get a build + Get a job by name + :param jobname: name of job, str + :return: Job obj """ for name, job in self.get_jobs(): - if name == buildname: + if name == jobname: return job - raise UnknownJob(buildname) + raise UnknownJob(jobname) def get_node_dict(self): """Get registered slave nodes on this instance""" -- 2.7.4