make it possible to create views again
authorsalimfadhley <sal@stodge.org>
Sun, 16 Jun 2013 15:09:49 +0000 (16:09 +0100)
committersalimfadhley <sal@stodge.org>
Sun, 16 Jun 2013 15:09:49 +0000 (16:09 +0100)
jenkinsapi/jenkins.py
jenkinsapi/views.py [new file with mode: 0644]
jenkinsapi_tests/systests/test_views.py [new file with mode: 0644]

index a50af7b..3cb6ab1 100644 (file)
@@ -1,34 +1,21 @@
-import time
+import json
 import urllib
-import urllib2
 import logging
 import urlparse
-import requests
-import StringIO
-import cookielib
 
 from jenkinsapi import config
 from jenkinsapi.job import Job
-from jenkinsapi.nodes import Nodes
 from jenkinsapi.node import Node
-from jenkinsapi.queue import Queue
 from jenkinsapi.view import View
+from jenkinsapi.nodes import Nodes
+from jenkinsapi.views import Views
+from jenkinsapi.queue import Queue
 from jenkinsapi.fingerprint import Fingerprint
 from jenkinsapi.jenkinsbase import JenkinsBase
 from jenkinsapi.utils.requester import Requester
-from jenkinsapi.exceptions import UnknownJob, NotAuthorized, JenkinsAPIException
+from jenkinsapi.exceptions import UnknownJob, JenkinsAPIException
 
-try:
-    import json
-except ImportError:
-    import simplejson as json
 
-try:
-    # Kerberos is now an extras_require - please see
-    # http://pythonhosted.org/distribute/setuptools.html#declaring-extras-optional-features-with-their-own-dependencies
-    from utils.urlopener_kerberos import mkkrbopener
-except ImportError:
-    mkkrbopener = None
 
 log = logging.getLogger(__name__)
 
@@ -62,14 +49,9 @@ class Jenkins(JenkinsBase):
         obj_fingerprint.validate()
         log.info("Jenkins says %s is valid" % id)
 
-    def reload(self):
-        '''Try and reload the configuration from disk'''
-        try:
-            self.requester.get_url("%(baseurl)s/reload" % self.__dict__)
-        except urllib2.HTTPError, e:
-            if e.code == 403:
-                raise NotAuthorized("You are not authorized to reload this server")
-            raise
+    # def reload(self):
+    #     '''Try and reload the configuration from disk'''
+    #     self.requester.get_url("%(baseurl)s/reload" % self.__dict__)
 
     def get_artifact_data(self, id):
         obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
@@ -184,7 +166,7 @@ class Jenkins(JenkinsBase):
         :return: new jenkins_obj
         """
         delete_job_url = self[jobname].get_delete_url()
-        response = self.requester.post_and_confirm_status(
+        self.requester.post_and_confirm_status(
             delete_job_url,
             data='some random bytes...'
         )
@@ -200,7 +182,7 @@ class Jenkins(JenkinsBase):
         """
         params = {'newName': newjobname}
         rename_job_url = self[jobname].get_rename_url()
-        response = self.requester.post_and_confirm_status(
+        self.requester.post_and_confirm_status(
             rename_job_url, params=params, data='')
         self.poll()
         return self[newjobname]
@@ -227,84 +209,18 @@ class Jenkins(JenkinsBase):
     def __str__(self):
         return "Jenkins server at %s" % self.baseurl
 
-    def _get_views(self):
-        log.debug('_get_views: self._data.has_key[views] %s' %
-                self._data.has_key('views'))
-        if not self._data.has_key("views"):
-            pass
-        else:
-            for viewdict in self._data["views"]:
-                yield viewdict["name"], viewdict["url"]
-
-    def get_view_dict(self):
-        return dict(self._get_views())
-
-    def get_view_url(self, str_view_name):
-        try:
-            view_dict = self.get_view_dict()
-            log.debug('view_dict=%s' % view_dict)
-            return view_dict[ str_view_name ]
-        except KeyError:
-            #noinspection PyUnboundLocalVariable
-            all_views = ", ".join(view_dict.keys())
-            raise KeyError("View %s is not known - available: %s" % (str_view_name, all_views))
-
-    def get_view(self, str_view_name):
-        view_url = self.get_view_url(str_view_name)
-        view_api_url = self.python_api_url(view_url)
-        return View(view_url , str_view_name, jenkins_obj=self)
+    def views(self):
+        return Views(self)
 
     def get_view_by_url(self, str_view_url):
         #for nested view
         str_view_name = str_view_url.split('/view/')[-1].replace('/', '')
         return View(str_view_url , str_view_name, jenkins_obj=self)
 
-    def delete_view_by_url(self, str_url):
-        url = "%s/doDelete" % str_url
-        response = self.requester.get_url(url, data='')
-        self.poll()
-        return self
-
     def get_nodes(self):
         url = self.get_nodes_url()
         return Nodes(url, self)
 
-    def create_view(self, str_view_name, person=None):
-        """
-        Create a view
-        :param str_view_name: name of new view, str
-        :param person: Person name (to create personal view), str
-        :return: new View obj or None if view was not created
-        """
-        url = urlparse.urljoin(self.baseurl, "user/%s/my-views/" % person) if person else self.baseurl
-        qs = urllib.urlencode({'value': str_view_name})
-        viewExistsCheck_url = urlparse.urljoin(url, "viewExistsCheck?%s" % qs)
-        log.debug('viewExistsCheck_url=%s' % viewExistsCheck_url)
-        result = self.requester.get_url(viewExistsCheck_url)
-        log.debug('result=%s' % result)
-        # Jenkins returns "<div/>" if view does not exist
-        if len(result) > len('<div/>'):
-            log.error('A view "%s" already exists' % (str_view_name))
-            return None
-        else:
-            data = {"mode":"hudson.model.ListView", "Submit": "OK"}
-            data['name'] = str_view_name
-            # data['json'] = data.copy()
-            # log.debug('json data=%s' % data)
-            # params = urllib.urlencode(data)
-            try:
-                createView_url = urlparse.urljoin(url, "createView")
-                log.debug('createView_url=%s' % createView_url)
-                result = self.requester.post_url(createView_url, data)
-            except urllib2.HTTPError, e:
-                log.debug("Error post_data %s" % createView_url)
-                log.exception(e)
-            # We changed Jenkins config - need to update ourself
-            self.poll()
-            new_view_obj = self.get_view(str_view_name)
-            assert isinstance(new_view_obj, View)
-            return new_view_obj
-
     def delete_view(self, str_view_name, view=None, person=None):
         """
         Delete a view
diff --git a/jenkinsapi/views.py b/jenkinsapi/views.py
new file mode 100644 (file)
index 0000000..cd255fc
--- /dev/null
@@ -0,0 +1,81 @@
+import json
+import urllib
+from jenkinsapi.view import View
+
+class Views(object):
+    """
+    An abstraction on a Jenkins object's views
+    """
+
+    def __init__(self, jenkins):
+        self.jenkins = jenkins
+
+    def __getitem__(self, view_name):
+        for row in self.jenkins._data['views']:
+            if row['name'] == view_name:
+                return View(
+                    row['url'],
+                    row['name'],
+                    self.jenkins)
+        raise KeyError(view_name)
+
+    def __iteritems__(self):
+        """
+        Get the names & objects for all views
+        """
+        self.jenkins.poll()
+        for row in self.jenkins._data['views']:
+            name = row['name']
+            url = row['url']
+
+            yield name, View(url, name, self.jenkins)
+
+    def __contains__(self, view_name):
+        """
+        True if view_name is the name of a defined view
+        """
+        return view_name in self.keys()
+
+    def iterkeys(self):
+        """
+        Get the names of all available views
+        """
+        for row in self.jenkins._data['views']:
+            yield row['name']
+
+    def keys(self):
+        """
+        Return a list of the names of all views
+        """
+        return list(self.iterkeys())
+
+    def delete_view_by_url(self, str_url):
+        url = "%s/doDelete" % str_url
+        response = self.requester.get_url(url, data='')
+        self.poll()
+        return self
+
+    def create(self, str_view_name):
+        """
+        Create a view
+        :param str_view_name: name of new view, str
+        :param person: Person name (to create personal view), str
+        :return: new View obj or None if view was not created
+        """
+        #url = urlparse.urljoin(self.baseurl, "user/%s/my-views/" % person) if person else self.baseurl
+        try:
+            return self[str_view_name]
+        except KeyError:
+            pass
+        url = '%s/createView' % self.jenkins.baseurl
+        headers = {'Content-Type': 'application/x-www-form-urlencoded'}
+        data = {
+            "name": str_view_name,
+            "mode": "hudson.model.ListView",
+            "Submit": "OK",
+            "json": json.dumps({"name": str_view_name, "mode": "hudson.model.ListView"})
+        }
+
+        self.jenkins.requester.post_and_confirm_status(url, data=data, headers=headers)
+        self.jenkins.poll()
+        return self[str_view_name]
diff --git a/jenkinsapi_tests/systests/test_views.py b/jenkinsapi_tests/systests/test_views.py
new file mode 100644 (file)
index 0000000..dc7690b
--- /dev/null
@@ -0,0 +1,24 @@
+'''
+System tests for `jenkinsapi.jenkins` module.
+'''
+import logging
+import unittest
+from jenkinsapi.view import View
+from jenkinsapi_tests.systests.base import BaseSystemTest
+from jenkinsapi_tests.test_utils.random_strings import random_string
+
+log = logging.getLogger(__name__)
+
+
+class TestViews(BaseSystemTest):
+    def test_make_views(self):
+        self._create_job()
+        view_name = random_string()
+        self.assertNotIn(view_name, self.jenkins.views())
+        v = self.jenkins.views().create(view_name)
+        self.assertIn(view_name, self.jenkins.views())
+        self.assertIsInstance(v, View)
+
+if __name__ == '__main__':
+    logging.basicConfig()
+    unittest.main()