-import time
+import json
import urllib
-import urllib2
import logging
import urlparse
-import requests
-import StringIO
-import cookielib
from jenkinsapi import config
from jenkinsapi.job import Job
-from jenkinsapi.nodes import Nodes
from jenkinsapi.node import Node
-from jenkinsapi.queue import Queue
from jenkinsapi.view import View
+from jenkinsapi.nodes import Nodes
+from jenkinsapi.views import Views
+from jenkinsapi.queue import Queue
from jenkinsapi.fingerprint import Fingerprint
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi.utils.requester import Requester
-from jenkinsapi.exceptions import UnknownJob, NotAuthorized, JenkinsAPIException
+from jenkinsapi.exceptions import UnknownJob, JenkinsAPIException
-try:
- import json
-except ImportError:
- import simplejson as json
-try:
- # Kerberos is now an extras_require - please see
- # http://pythonhosted.org/distribute/setuptools.html#declaring-extras-optional-features-with-their-own-dependencies
- from utils.urlopener_kerberos import mkkrbopener
-except ImportError:
- mkkrbopener = None
log = logging.getLogger(__name__)
obj_fingerprint.validate()
log.info("Jenkins says %s is valid" % id)
- def reload(self):
- '''Try and reload the configuration from disk'''
- try:
- self.requester.get_url("%(baseurl)s/reload" % self.__dict__)
- except urllib2.HTTPError, e:
- if e.code == 403:
- raise NotAuthorized("You are not authorized to reload this server")
- raise
+ # def reload(self):
+ # '''Try and reload the configuration from disk'''
+ # self.requester.get_url("%(baseurl)s/reload" % self.__dict__)
def get_artifact_data(self, id):
obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
:return: new jenkins_obj
"""
delete_job_url = self[jobname].get_delete_url()
- response = self.requester.post_and_confirm_status(
+ self.requester.post_and_confirm_status(
delete_job_url,
data='some random bytes...'
)
"""
params = {'newName': newjobname}
rename_job_url = self[jobname].get_rename_url()
- response = self.requester.post_and_confirm_status(
+ self.requester.post_and_confirm_status(
rename_job_url, params=params, data='')
self.poll()
return self[newjobname]
def __str__(self):
return "Jenkins server at %s" % self.baseurl
- def _get_views(self):
- log.debug('_get_views: self._data.has_key[views] %s' %
- self._data.has_key('views'))
- if not self._data.has_key("views"):
- pass
- else:
- for viewdict in self._data["views"]:
- yield viewdict["name"], viewdict["url"]
-
- def get_view_dict(self):
- return dict(self._get_views())
-
- def get_view_url(self, str_view_name):
- try:
- view_dict = self.get_view_dict()
- log.debug('view_dict=%s' % view_dict)
- return view_dict[ str_view_name ]
- except KeyError:
- #noinspection PyUnboundLocalVariable
- all_views = ", ".join(view_dict.keys())
- raise KeyError("View %s is not known - available: %s" % (str_view_name, all_views))
-
- def get_view(self, str_view_name):
- view_url = self.get_view_url(str_view_name)
- view_api_url = self.python_api_url(view_url)
- return View(view_url , str_view_name, jenkins_obj=self)
+ def views(self):
+ return Views(self)
def get_view_by_url(self, str_view_url):
#for nested view
str_view_name = str_view_url.split('/view/')[-1].replace('/', '')
return View(str_view_url , str_view_name, jenkins_obj=self)
- def delete_view_by_url(self, str_url):
- url = "%s/doDelete" % str_url
- response = self.requester.get_url(url, data='')
- self.poll()
- return self
-
def get_nodes(self):
url = self.get_nodes_url()
return Nodes(url, self)
- def create_view(self, str_view_name, person=None):
- """
- Create a view
- :param str_view_name: name of new view, str
- :param person: Person name (to create personal view), str
- :return: new View obj or None if view was not created
- """
- url = urlparse.urljoin(self.baseurl, "user/%s/my-views/" % person) if person else self.baseurl
- qs = urllib.urlencode({'value': str_view_name})
- viewExistsCheck_url = urlparse.urljoin(url, "viewExistsCheck?%s" % qs)
- log.debug('viewExistsCheck_url=%s' % viewExistsCheck_url)
- result = self.requester.get_url(viewExistsCheck_url)
- log.debug('result=%s' % result)
- # Jenkins returns "<div/>" if view does not exist
- if len(result) > len('<div/>'):
- log.error('A view "%s" already exists' % (str_view_name))
- return None
- else:
- data = {"mode":"hudson.model.ListView", "Submit": "OK"}
- data['name'] = str_view_name
- # data['json'] = data.copy()
- # log.debug('json data=%s' % data)
- # params = urllib.urlencode(data)
- try:
- createView_url = urlparse.urljoin(url, "createView")
- log.debug('createView_url=%s' % createView_url)
- result = self.requester.post_url(createView_url, data)
- except urllib2.HTTPError, e:
- log.debug("Error post_data %s" % createView_url)
- log.exception(e)
- # We changed Jenkins config - need to update ourself
- self.poll()
- new_view_obj = self.get_view(str_view_name)
- assert isinstance(new_view_obj, View)
- return new_view_obj
-
def delete_view(self, str_view_name, view=None, person=None):
"""
Delete a view
--- /dev/null
+import json
+import urllib
+from jenkinsapi.view import View
+
+class Views(object):
+ """
+ An abstraction on a Jenkins object's views
+ """
+
+ def __init__(self, jenkins):
+ self.jenkins = jenkins
+
+ def __getitem__(self, view_name):
+ for row in self.jenkins._data['views']:
+ if row['name'] == view_name:
+ return View(
+ row['url'],
+ row['name'],
+ self.jenkins)
+ raise KeyError(view_name)
+
+ def __iteritems__(self):
+ """
+ Get the names & objects for all views
+ """
+ self.jenkins.poll()
+ for row in self.jenkins._data['views']:
+ name = row['name']
+ url = row['url']
+
+ yield name, View(url, name, self.jenkins)
+
+ def __contains__(self, view_name):
+ """
+ True if view_name is the name of a defined view
+ """
+ return view_name in self.keys()
+
+ def iterkeys(self):
+ """
+ Get the names of all available views
+ """
+ for row in self.jenkins._data['views']:
+ yield row['name']
+
+ def keys(self):
+ """
+ Return a list of the names of all views
+ """
+ return list(self.iterkeys())
+
+ def delete_view_by_url(self, str_url):
+ url = "%s/doDelete" % str_url
+ response = self.requester.get_url(url, data='')
+ self.poll()
+ return self
+
+ def create(self, str_view_name):
+ """
+ Create a view
+ :param str_view_name: name of new view, str
+ :param person: Person name (to create personal view), str
+ :return: new View obj or None if view was not created
+ """
+ #url = urlparse.urljoin(self.baseurl, "user/%s/my-views/" % person) if person else self.baseurl
+ try:
+ return self[str_view_name]
+ except KeyError:
+ pass
+ url = '%s/createView' % self.jenkins.baseurl
+ headers = {'Content-Type': 'application/x-www-form-urlencoded'}
+ data = {
+ "name": str_view_name,
+ "mode": "hudson.model.ListView",
+ "Submit": "OK",
+ "json": json.dumps({"name": str_view_name, "mode": "hudson.model.ListView"})
+ }
+
+ self.jenkins.requester.post_and_confirm_status(url, data=data, headers=headers)
+ self.jenkins.poll()
+ return self[str_view_name]