Cleanup
authorRamon van Alteren <ramon@vanalteren.nl>
Wed, 4 Jan 2012 14:08:03 +0000 (15:08 +0100)
committerRamon van Alteren <ramon@vanalteren.nl>
Wed, 4 Jan 2012 14:08:03 +0000 (15:08 +0100)
pyjenkinsci/command_line/jenkins_invoke.py
pyjenkinsci/utils/urlopener.py

index 3f9afd0..e2b812c 100644 (file)
@@ -1,63 +1,62 @@
-import os\r
-import sys\r
-import logging\r
-import optparse\r
-import jenkins\r
-\r
-log = logging.getLogger(__name__)\r
-\r
-class jenkins_invoke(object):\r
-\r
-    @classmethod\r
-    def mkparser(cls):\r
-        parser = optparse.OptionParser()\r
-        DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )\r
-        parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."\r
-        parser.add_option("-J", "--jenkinsbase", dest="baseurl",\r
-                          help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,\r
-                          type="str",\r
-                          default=DEFAULT_BASEURL, )\r
-        parser.add_option("-b", "--block", dest="block",\r
-                          help="Block until each of the jobs is complete." ,\r
-                          action="store_true",\r
-                          default=False )\r
-        parser.add_option("-t", "--token", dest="token",\r
-                          help="Optional security token." ,\r
-                          default=None )\r
-        return parser\r
-\r
-    @classmethod\r
-    def main(cls):\r
-        parser = cls.mkparser()\r
-        options, args = parser.parse_args()\r
-        try:\r
-            assert len( args ) > 0, "Need to specify at least one job name"\r
-        except AssertionError, e:\r
-            log.critical( e[0] )\r
-            parser.print_help()\r
-            sys.exit(1)\r
-        invoker = cls( options, args )\r
-        invoker()\r
-\r
-    def __init__( self, options, jobs ):\r
-        self.options = options\r
-        self.jobs = jobs\r
-\r
-    def __call__(self):\r
-        for job in self.jobs:\r
-            self.invokejob( job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token )\r
-\r
-    def invokejob(self, jobname, block, baseurl, token ):\r
-        assert type(block) == bool\r
-        assert type(baseurl) == str\r
-        assert type(jobname) == str\r
-        assert token is None or isinstance( token, str )\r
-        jenkinsserver = jenkins( baseurl )\r
-        job = jenkinsserver[ jobname ]\r
-        job.invoke( securitytoken=token, block=block )\r
-\r
-\r
-def main(  ):\r
-    logging.basicConfig()\r
-    logging.getLogger("").setLevel( logging.INFO )\r
-    jenkins_invoke.main()\r
+import os
+import sys
+import logging
+import optparse
+from pyjenkinsci import jenkins
+
+log = logging.getLogger(__name__)
+
+class jenkins_invoke(object):
+    @classmethod
+    def mkparser(cls):
+        parser = optparse.OptionParser()
+        DEFAULT_BASEURL=os.environ.get( "JENKINS_URL", "http://localhost/jenkins" )
+        parser.help_text = "Execute a number of jenkins jobs on the server of your choice. Optionally block until the jobs are complete."
+        parser.add_option("-J", "--jenkinsbase", dest="baseurl",
+                          help="Base URL for the Jenkins server, default is %s" % DEFAULT_BASEURL,
+                          type="str",
+                          default=DEFAULT_BASEURL, )
+        parser.add_option("-b", "--block", dest="block",
+                          help="Block until each of the jobs is complete." ,
+                          action="store_true",
+                          default=False )
+        parser.add_option("-t", "--token", dest="token",
+                          help="Optional security token." ,
+                          default=None )
+        return parser
+
+    @classmethod
+    def main(cls):
+        parser = cls.mkparser()
+        options, args = parser.parse_args()
+        try:
+            assert len(args) > 0, "Need to specify at least one job name"
+        except AssertionError, e:
+            log.critical(e[0])
+            parser.print_help()
+            sys.exit(1)
+        invoker = cls(options, args)
+        invoker()
+
+    def __init__(self, options, jobs):
+        self.options = options
+        self.jobs = jobs
+
+    def __call__(self):
+        for job in self.jobs:
+            self.invokejob(job, block=self.options.block, baseurl=self.options.baseurl, token=self.options.token)
+
+    def invokejob(self, jobname, block, baseurl, token ):
+        assert type(block) == bool
+        assert type(baseurl) == str
+        assert type(jobname) == str
+        assert token is None or isinstance(token, str)
+        jenkinsserver = jenkins.Jenkins( baseurl )
+        job = jenkinsserver[jobname]
+        job.invoke(securitytoken=token, block=block)
+
+
+def main(  ):
+    logging.basicConfig()
+    logging.getLogger("").setLevel(logging.INFO)
+    jenkins_invoke.main()
\ No newline at end of file
index e6b676f..9d58807 100644 (file)
@@ -1,79 +1,78 @@
-import urllib2\r
-import urlparse\r
-import base64\r
-\r
-import logging\r
-\r
-log = logging.getLogger( __name__ )\r
-\r
-class PreemptiveBasicAuthHandler(urllib2.BaseHandler):\r
-\r
-        def __init__(self, password_mgr=None):\r
-                if password_mgr is None:\r
-                        password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()\r
-                self.passwd = password_mgr\r
-                self.add_password = self.passwd.add_password\r
-\r
-        def http_request(self,req):\r
-                uri = req.get_full_url()\r
-                user, pw = self.passwd.find_user_password(None,uri)\r
-                log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw))\r
-                if pw is None: return req\r
-                raw = "%s:%s" % (user, pw)\r
-                auth = 'Basic %s' % base64.b64encode(raw).strip()\r
-                req.add_unredirected_header('Authorization', auth)\r
-                return req\r
-\r
-def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ):\r
-    handlers = []\r
-    for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl):\r
-        handlers.append(handler)\r
-    for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):\r
-        handlers.append(handler)\r
-    opener = urllib2.build_opener(*handlers)\r
-    return opener.open\r
-\r
-def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl):\r
-    """\r
-    Get a basic authentification handler for jenkins\r
-    :param jenkinsuser: jenkins username, str\r
-    :param jenkinspass: jenkins password, str\r
-    :param jenkinsurl: jenkins base url, str\r
-    :return: a list of handlers\r
-    """\r
-    for param in jenkinsuser, jenkinspass, jenkinsurl:\r
-        if param is None:\r
-            return []\r
-    assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser)\r
-    assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass)\r
-#    hostname = urlparse.urlsplit(jenkinsurl).hostname\r
-    handler = PreemptiveBasicAuthHandler()\r
-    handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass)\r
-    log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser))\r
-    return [ handler ]\r
-\r
-def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):\r
-    """\r
-    Get a configured handler for a proxy\r
-\r
-    :param proxyhost: proxy hostname, str\r
-    :param proxyport: proxy port, int\r
-    :param proxyuser: proxy username, str\r
-    :param proxypass: proxy password, str\r
-    :return: list of handlers\r
-    """\r
-    for param in proxyhost, proxyport, proxyuser, proxypass:\r
-        if param is None:\r
-            return []\r
-    assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )\r
-    assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )\r
-    assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )\r
-\r
-    proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),\r
-                   'https': 'http://%s:%i/' % (proxyhost, proxyport) }\r
-\r
-    proxy_handler = urllib2.ProxyHandler( proxy_spec )\r
-    proxy_auth_handler = urllib2.HTTPBasicAuthHandler()\r
-    proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )\r
-    return [proxy_handler, proxy_auth_handler]\r
-\r
+import urllib2
+import base64
+
+import logging
+
+log = logging.getLogger( __name__ )
+
+class PreemptiveBasicAuthHandler(urllib2.BaseHandler):
+
+        def __init__(self, password_mgr=None):
+                if password_mgr is None:
+                        password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+                self.passwd = password_mgr
+                self.add_password = self.passwd.add_password
+
+        def http_request(self,req):
+                uri = req.get_full_url()
+                user, pw = self.passwd.find_user_password(None,uri)
+                log.debug('ADDING REQUEST HEADER for uri (%s): %s:%s' % (uri,user,pw))
+                if pw is None: return req
+                raw = "%s:%s" % (user, pw)
+                auth = 'Basic %s' % base64.b64encode(raw).strip()
+                req.add_unredirected_header('Authorization', auth)
+                return req
+
+def mkurlopener( jenkinsuser, jenkinspass, jenkinsurl, proxyhost, proxyport, proxyuser, proxypass ):
+    handlers = []
+    for handler in get_jenkins_auth_handler(jenkinsuser=jenkinsuser, jenkinspass=jenkinspass, jenkinsurl=jenkinsurl):
+        handlers.append(handler)
+    for handler in get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
+        handlers.append(handler)
+    opener = urllib2.build_opener(*handlers)
+    return opener.open
+
+def get_jenkins_auth_handler(jenkinsuser, jenkinspass, jenkinsurl):
+    """
+    Get a basic authentification handler for jenkins
+    :param jenkinsuser: jenkins username, str
+    :param jenkinspass: jenkins password, str
+    :param jenkinsurl: jenkins base url, str
+    :return: a list of handlers
+    """
+    for param in jenkinsuser, jenkinspass, jenkinsurl:
+        if param is None:
+            return []
+    assert type(jenkinsuser) == str, "Jenkins username should be a string, got %s" % repr(jenkinsuser)
+    assert type(jenkinspass) == str, "Jenkins password should be a string, git %s" % repr(jenkinspass)
+#    hostname = urlparse.urlsplit(jenkinsurl).hostname
+    handler = PreemptiveBasicAuthHandler()
+    handler.add_password(None, jenkinsurl, jenkinsuser, jenkinspass)
+    log.debug('Adding BasicAuthHandler: url:%s, user:%s,' % (jenkinsurl, jenkinsuser))
+    return [ handler ]
+
+def get_proxy_handler(proxyhost, proxyport, proxyuser, proxypass):
+    """
+    Get a configured handler for a proxy
+
+    :param proxyhost: proxy hostname, str
+    :param proxyport: proxy port, int
+    :param proxyuser: proxy username, str
+    :param proxypass: proxy password, str
+    :return: list of handlers
+    """
+    for param in proxyhost, proxyport, proxyuser, proxypass:
+        if param is None:
+            return []
+    assert type( proxyport ) == int, "Proxy port should be an int, got %s" % repr( proxyport )
+    assert type( proxypass ) == str, "Proxy password should be a sting, got %s" % repr( proxypass )
+    assert type( proxyuser ) == str, "Proxy username should be a string, got %s" % repr( proxyuser )
+
+    proxy_spec = { 'http': 'http://%s:%i/' % (proxyhost, proxyport),
+                   'https': 'http://%s:%i/' % (proxyhost, proxyport) }
+
+    proxy_handler = urllib2.ProxyHandler( proxy_spec )
+    proxy_auth_handler = urllib2.HTTPBasicAuthHandler()
+    proxy_auth_handler.add_password( None, proxyhost, proxyuser, proxypass )
+    return [proxy_handler, proxy_auth_handler]
+