-import os\r
-import sys\r
-import logging\r
-import optparse\r
-import jenkins\r
-\r
-log = logging.getLogger(__name__)\r
-\r
-class jenkins_invoke(object):\r
-\r
- @classmethod\r
- def mkparser(cls):\r
- parser = optparse.OptionParser()\r
- DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )\r
- parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."\r
- parser.add_option("-J", "--jenkinsbase", dest="baseurl",\r
- help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,\r
- type="str",\r
- default=DEFAULT_BASEURL, )\r
- parser.add_option("-b", "--block", dest="block",\r
- help="Block until each of the jobs is complete." ,\r
- action="store_true",\r
- default=False )\r
- parser.add_option("-t", "--token", dest="token",\r
- help="Optional security token." ,\r
- default=None )\r
- return parser\r
-\r
- @classmethod\r
- def main(cls):\r
- parser = cls.mkparser()\r
- options, args = parser.parse_args()\r
- try:\r
- assert len( args ) > 0, "Need to specify at least one job name"\r
- except AssertionError, e:\r
- log.critical( e[0] )\r
- parser.print_help()\r
- sys.exit(1)\r
- invoker = cls( options, args )\r
- invoker()\r
-\r
- def __init__( self, options, jobs ):\r
- self.options = options\r
- self.jobs = jobs\r
-\r
- def __call__(self):\r
- for job in self.jobs:\r
- self.invokejob( job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token )\r
-\r
- def invokejob(self, jobname, block, baseurl, token ):\r
- assert type(block) == bool\r
- assert type(baseurl) == str\r
- assert type(jobname) == str\r
- assert token is None or isinstance( token, str )\r
- jenkinsserver = jenkins( baseurl )\r
- job = jenkinsserver[ jobname ]\r
- job.invoke( securitytoken=token, block=block )\r
-\r
-\r
-def main( ):\r
- logging.basicConfig()\r
- logging.getLogger("").setLevel( logging.INFO )\r
- jenkins_invoke.main()\r
+import os
+import sys
+import logging
+import optparse
+from pyjenkinsci import jenkins
+
+log = logging.getLogger(__name__)
+
+class jenkins_invoke(object):
+ @classmethod
+ def mkparser(cls):
+ parser = optparse.OptionParser()
+ DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )
+ parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."
+ parser.add_option("-J", "--jenkinsbase", dest="baseurl",
+ help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,
+ type="str",
+ default=DEFAULT_BASEURL, )
+ parser.add_option("-b", "--block", dest="block",
+ help="Block until each of the jobs is complete." ,
+ action="store_true",
+ default=False )
+ parser.add_option("-t", "--token", dest="token",
+ help="Optional security token." ,
+ default=None )
+ return parser
+
+ @classmethod
+ def main(cls):
+ parser = cls.mkparser()
+ options, args = parser.parse_args()
+ try:
+ assert len(args) > 0, "Need to specify at least one job name"
+ except AssertionError, e:
+ log.critical(e[0])
+ parser.print_help()
+ sys.exit(1)
+ invoker = cls(options, args)
+ invoker()
+
+ def __init__(self, options, jobs):
+ self.options = options
+ self.jobs = jobs
+
+ def __call__(self):
+ for job in self.jobs:
+ self.invokejob(job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token)
+
+ def invokejob(self, jobname, block, baseurl, token ):
+ assert type(block) == bool
+ assert type(baseurl) == str
+ assert type(jobname) == str
+ assert token is None or isinstance(token, str)
+ jenkinsserver = jenkins.Jenkins( baseurl )
+ job = jenkinsserver[jobname]
+ job.invoke(securitytoken=token, block=block)
+
+
+def main( ):
+ logging.basicConfig()
+ logging.getLogger("").setLevel(logging.INFO)
+ jenkins_invoke.main()
\ No newline at end of file
-import urllib2\r
-import urlparse\r
-import base64\r
-\r
-import logging\r
-\r
-log = logging.getLogger( __name__ )\r
-\r
-class PreemptiveBasicAuthHandler(urllib2.BaseHandler):\r
-\r
- def __init__(self, password_mgr=None):\r
- if password_mgr is None:\r
- password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()\r
- self.passwd = password_mgr\r
- self.add_password = self.passwd.add_password\r
-\r
- def http_request(self,req):\r
- uri = req.get_full_url()\r
- user, pw = self.passwd.find_user_password(None,uri)\r
- log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw))\r
- if pw is None: return req\r
- raw = "%s:%s" % (user, pw)\r
- auth = 'Basic %s' % base64.b64encode(raw).strip()\r
- req.add_unredirected_header('Authorization', auth)\r
- return req\r
-\r
-def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ):\r
- handlers = []\r
- for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl):\r
- handlers.append(handler)\r
- for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):\r
- handlers.append(handler)\r
- opener = urllib2.build_opener(*handlers)\r
- return opener.open\r
-\r
-def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl):\r
- """\r
- Get a basic authentification handler for jenkins\r
- :param jenkinsuser: jenkins username, str\r
- :param jenkinspass: jenkins password, str\r
- :param jenkinsurl: jenkins base url, str\r
- :return: a list of handlers\r
- """\r
- for param in jenkinsuser, jenkinspass, jenkinsurl:\r
- if param is None:\r
- return []\r
- assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser)\r
- assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass)\r
-# hostname = urlparse.urlsplit(jenkinsurl).hostname\r
- handler = PreemptiveBasicAuthHandler()\r
- handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass)\r
- log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser))\r
- return [ handler ]\r
-\r
-def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):\r
- """\r
- Get a configured handler for a proxy\r
-\r
- :param proxyhost: proxy hostname, str\r
- :param proxyport: proxy port, int\r
- :param proxyuser: proxy username, str\r
- :param proxypass: proxy password, str\r
- :return: list of handlers\r
- """\r
- for param in proxyhost, proxyport, proxyuser, proxypass:\r
- if param is None:\r
- return []\r
- assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )\r
- assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )\r
- assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )\r
-\r
- proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),\r
- 'https': 'http://%s:%i/' % (proxyhost, proxyport) }\r
-\r
- proxy_handler = urllib2.ProxyHandler( proxy_spec )\r
- proxy_auth_handler = urllib2.HTTPBasicAuthHandler()\r
- proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )\r
- return [proxy_handler, proxy_auth_handler]\r
-\r
+import urllib2
+import base64
+
+import logging
+
+log = logging.getLogger( __name__ )
+
+class PreemptiveBasicAuthHandler(urllib2.BaseHandler):
+
+ def __init__(self, password_mgr=None):
+ if password_mgr is None:
+ password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+ self.passwd = password_mgr
+ self.add_password = self.passwd.add_password
+
+ def http_request(self,req):
+ uri = req.get_full_url()
+ user, pw = self.passwd.find_user_password(None,uri)
+ log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw))
+ if pw is None: return req
+ raw = "%s:%s" % (user, pw)
+ auth = 'Basic %s' % base64.b64encode(raw).strip()
+ req.add_unredirected_header('Authorization', auth)
+ return req
+
+def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ):
+ handlers = []
+ for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl):
+ handlers.append(handler)
+ for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
+ handlers.append(handler)
+ opener = urllib2.build_opener(*handlers)
+ return opener.open
+
+def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl):
+ """
+ Get a basic authentification handler for jenkins
+ :param jenkinsuser: jenkins username, str
+ :param jenkinspass: jenkins password, str
+ :param jenkinsurl: jenkins base url, str
+ :return: a list of handlers
+ """
+ for param in jenkinsuser, jenkinspass, jenkinsurl:
+ if param is None:
+ return []
+ assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser)
+ assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass)
+# hostname = urlparse.urlsplit(jenkinsurl).hostname
+ handler = PreemptiveBasicAuthHandler()
+ handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass)
+ log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser))
+ return [ handler ]
+
+def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
+ """
+ Get a configured handler for a proxy
+
+ :param proxyhost: proxy hostname, str
+ :param proxyport: proxy port, int
+ :param proxyuser: proxy username, str
+ :param proxypass: proxy password, str
+ :return: list of handlers
+ """
+ for param in proxyhost, proxyport, proxyuser, proxypass:
+ if param is None:
+ return []
+ assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )
+ assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )
+ assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )
+
+ proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),
+ 'https': 'http://%s:%i/' % (proxyhost, proxyport) }
+
+ proxy_handler = urllib2.ProxyHandler( proxy_spec )
+ proxy_auth_handler = urllib2.HTTPBasicAuthHandler()
+ proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )
+ return [proxy_handler, proxy_auth_handler]
+