From e1088eec89fed852467b38986208da3f5357fe5c Mon Sep 17 00:00:00 2001 From: Ramon van Alteren Date: Wed, 4 Jan 2012 15:08:03 +0100 Subject: [PATCH] Cleanup --- pyjenkinsci/command_line/jenkins_invoke.py | 125 ++++++++++++----------- pyjenkinsci/utils/urlopener.py | 157 ++++++++++++++--------------- 2 files changed, 140 insertions(+), 142 deletions(-) diff --git a/pyjenkinsci/command_line/jenkins_invoke.py b/pyjenkinsci/command_line/jenkins_invoke.py index 3f9afd0..e2b812c 100644 --- a/pyjenkinsci/command_line/jenkins_invoke.py +++ b/pyjenkinsci/command_line/jenkins_invoke.py @@ -1,63 +1,62 @@ -import os -import sys -import logging -import optparse -import jenkins - -log = logging.getLogger(__name__) - -class jenkins_invoke(object): - - @classmethod - def mkparser(cls): - parser = optparse.OptionParser() - DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" ) - parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete." - parser.add_option("-J", "--jenkinsbase", dest="baseurl", - help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL, - type="str", - default=DEFAULT_BASEURL, ) - parser.add_option("-b", "--block", dest="block", - help="Block until each of the jobs is complete." , - action="store_true", - default=False ) - parser.add_option("-t", "--token", dest="token", - help="Optional security token." , - default=None ) - return parser - - @classmethod - def main(cls): - parser = cls.mkparser() - options, args = parser.parse_args() - try: - assert len( args ) > 0, "Need to specify at least one job name" - except AssertionError, e: - log.critical( e[0] ) - parser.print_help() - sys.exit(1) - invoker = cls( options, args ) - invoker() - - def __init__( self, options, jobs ): - self.options = options - self.jobs = jobs - - def __call__(self): - for job in self.jobs: - self.invokejob( job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token ) - - def invokejob(self, jobname, block, baseurl, token ): - assert type(block) == bool - assert type(baseurl) == str - assert type(jobname) == str - assert token is None or isinstance( token, str ) - jenkinsserver = jenkins( baseurl ) - job = jenkinsserver[ jobname ] - job.invoke( securitytoken=token, block=block ) - - -def main( ): - logging.basicConfig() - logging.getLogger("").setLevel( logging.INFO ) - jenkins_invoke.main() +import os +import sys +import logging +import optparse +from pyjenkinsci import jenkins + +log = logging.getLogger(__name__) + +class jenkins_invoke(object): + @classmethod + def mkparser(cls): + parser = optparse.OptionParser() + DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" ) + parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete." + parser.add_option("-J", "--jenkinsbase", dest="baseurl", + help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL, + type="str", + default=DEFAULT_BASEURL, ) + parser.add_option("-b", "--block", dest="block", + help="Block until each of the jobs is complete." , + action="store_true", + default=False ) + parser.add_option("-t", "--token", dest="token", + help="Optional security token." , + default=None ) + return parser + + @classmethod + def main(cls): + parser = cls.mkparser() + options, args = parser.parse_args() + try: + assert len(args) > 0, "Need to specify at least one job name" + except AssertionError, e: + log.critical(e[0]) + parser.print_help() + sys.exit(1) + invoker = cls(options, args) + invoker() + + def __init__(self, options, jobs): + self.options = options + self.jobs = jobs + + def __call__(self): + for job in self.jobs: + self.invokejob(job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token) + + def invokejob(self, jobname, block, baseurl, token ): + assert type(block) == bool + assert type(baseurl) == str + assert type(jobname) == str + assert token is None or isinstance(token, str) + jenkinsserver = jenkins.Jenkins( baseurl ) + job = jenkinsserver[jobname] + job.invoke(securitytoken=token, block=block) + + +def main( ): + logging.basicConfig() + logging.getLogger("").setLevel(logging.INFO) + jenkins_invoke.main() \ No newline at end of file diff --git a/pyjenkinsci/utils/urlopener.py b/pyjenkinsci/utils/urlopener.py index e6b676f..9d58807 100644 --- a/pyjenkinsci/utils/urlopener.py +++ b/pyjenkinsci/utils/urlopener.py @@ -1,79 +1,78 @@ -import urllib2 -import urlparse -import base64 - -import logging - -log = logging.getLogger( __name__ ) - -class PreemptiveBasicAuthHandler(urllib2.BaseHandler): - - def __init__(self, password_mgr=None): - if password_mgr is None: - password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() - self.passwd = password_mgr - self.add_password = self.passwd.add_password - - def http_request(self,req): - uri = req.get_full_url() - user, pw = self.passwd.find_user_password(None,uri) - log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw)) - if pw is None: return req - raw = "%s:%s" % (user, pw) - auth = 'Basic %s' % base64.b64encode(raw).strip() - req.add_unredirected_header('Authorization', auth) - return req - -def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ): - handlers = [] - for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl): - handlers.append(handler) - for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass): - handlers.append(handler) - opener = urllib2.build_opener(*handlers) - return opener.open - -def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl): - """ - Get a basic authentification handler for jenkins - :param jenkinsuser: jenkins username, str - :param jenkinspass: jenkins password, str - :param jenkinsurl: jenkins base url, str - :return: a list of handlers - """ - for param in jenkinsuser, jenkinspass, jenkinsurl: - if param is None: - return [] - assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser) - assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass) -# hostname = urlparse.urlsplit(jenkinsurl).hostname - handler = PreemptiveBasicAuthHandler() - handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass) - log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser)) - return [ handler ] - -def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass): - """ - Get a configured handler for a proxy - - :param proxyhost: proxy hostname, str - :param proxyport: proxy port, int - :param proxyuser: proxy username, str - :param proxypass: proxy password, str - :return: list of handlers - """ - for param in proxyhost, proxyport, proxyuser, proxypass: - if param is None: - return [] - assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport ) - assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass ) - assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser ) - - proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport), - 'https': 'http://%s:%i/' % (proxyhost, proxyport) } - - proxy_handler = urllib2.ProxyHandler( proxy_spec ) - proxy_auth_handler = urllib2.HTTPBasicAuthHandler() - proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass ) - return [proxy_handler, proxy_auth_handler] - +import urllib2 +import base64 + +import logging + +log = logging.getLogger( __name__ ) + +class PreemptiveBasicAuthHandler(urllib2.BaseHandler): + + def __init__(self, password_mgr=None): + if password_mgr is None: + password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + self.passwd = password_mgr + self.add_password = self.passwd.add_password + + def http_request(self,req): + uri = req.get_full_url() + user, pw = self.passwd.find_user_password(None,uri) + log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw)) + if pw is None: return req + raw = "%s:%s" % (user, pw) + auth = 'Basic %s' % base64.b64encode(raw).strip() + req.add_unredirected_header('Authorization', auth) + return req + +def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ): + handlers = [] + for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl): + handlers.append(handler) + for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass): + handlers.append(handler) + opener = urllib2.build_opener(*handlers) + return opener.open + +def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl): + """ + Get a basic authentification handler for jenkins + :param jenkinsuser: jenkins username, str + :param jenkinspass: jenkins password, str + :param jenkinsurl: jenkins base url, str + :return: a list of handlers + """ + for param in jenkinsuser, jenkinspass, jenkinsurl: + if param is None: + return [] + assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser) + assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass) +# hostname = urlparse.urlsplit(jenkinsurl).hostname + handler = PreemptiveBasicAuthHandler() + handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass) + log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser)) + return [ handler ] + +def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass): + """ + Get a configured handler for a proxy + + :param proxyhost: proxy hostname, str + :param proxyport: proxy port, int + :param proxyuser: proxy username, str + :param proxypass: proxy password, str + :return: list of handlers + """ + for param in proxyhost, proxyport, proxyuser, proxypass: + if param is None: + return [] + assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport ) + assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass ) + assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser ) + + proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport), + 'https': 'http://%s:%i/' % (proxyhost, proxyport) } + + proxy_handler = urllib2.ProxyHandler( proxy_spec ) + proxy_auth_handler = urllib2.HTTPBasicAuthHandler() + proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass ) + return [proxy_handler, proxy_auth_handler] + -- 2.7.4