--- /dev/null
+<?xml version='1.0' encoding='UTF-8'?>
+<project>
+ <actions/>
+ <description></description>
+ <keepDependencies>false</keepDependencies>
+ <properties/>
+ <scm class="hudson.scm.NullSCM"/>
+ <canRoam>true</canRoam>
+ <disabled>false</disabled>
+ <blockBuildWhenDownstreamBuilding>false</blockBuildWhenDownstreamBuilding>
+ <blockBuildWhenUpstreamBuilding>false</blockBuildWhenUpstreamBuilding>
+ <triggers class="vector"/>
+ <concurrentBuild>false</concurrentBuild>
+ <builders/>
+ <publishers/>
+ <buildWrappers/>
+</project>
--- /dev/null
+from jenkinsapi.view import View
+from jenkinsapi.jenkins import Jenkins
+J = Jenkins('http://localhost:8080')
+print J.items()
+j= J['foo']
+j = J.get_job("foo")
+b = j.get_last_build()
+print b
+mjn = b.get_master_job_name()
+print(mjn)
+
+EMPTY_JOB_CONFIG = '''\
+<?xml version='1.0' encoding='UTF-8'?>
+<project>
+ <actions/>
+ <description></description>
+ <keepDependencies>false</keepDependencies>
+ <properties/>
+ <scm class="hudson.scm.NullSCM"/>
+ <canRoam>true</canRoam>
+ <disabled>false</disabled>
+ <blockBuildWhenDownstreamBuilding>false</blockBuildWhenDownstreamBuilding>
+ <blockBuildWhenUpstreamBuilding>false</blockBuildWhenUpstreamBuilding>
+ <triggers class="vector"/>
+ <concurrentBuild>false</concurrentBuild>
+ <builders/>
+ <publishers/>
+ <buildWrappers/>
+</project>
+'''
+
+new_job = J.create_job(name='foo_job', config=EMPTY_JOB_CONFIG)
\ No newline at end of file
JENKINS_API = r"api/python/"
LOAD_TIMEOUT = 30
-LOAD_ATTEMPTS = 5
\ No newline at end of file
-class ArtifactsMissing(Exception):
+class JenkinsAPIException(Exception):
+ """
+ Base class for all errors
+ """
+
+class ArtifactsMissing(JenkinsAPIException):
"""
Cannot find a build with all of the required artifacts.
"""
-class UnknownJob( KeyError ):
+class UnknownJob( KeyError, JenkinsAPIException):
"""
Jenkins does not recognize the job requested.
"""
-class ArtifactBroken(Exception):
+class ArtifactBroken(JenkinsAPIException):
"""
An artifact is broken, wrong
"""
-class TimeOut( Exception ):
+class TimeOut( JenkinsAPIException ):
"""
Some jobs have taken too long to complete.
"""
-class WillNotBuild(Exception):
+class WillNotBuild(JenkinsAPIException):
"""
Cannot trigger a new build.
"""
-class NoBuildData(Exception):
+class NoBuildData(JenkinsAPIException):
"""
A job has no build data.
"""
-class NoResults(Exception):
+class NoResults(JenkinsAPIException):
"""
A build did not publish any results.
"""
A build did not publish any results because it failed
"""
-class BadURL(ValueError):
+class BadURL(ValueError,JenkinsAPIException):
"""
A URL appears to be broken
"""
-class NotFound(Exception):
+class NotFound(JenkinsAPIException):
"""
Resource cannot be found
"""
-class NotAuthorized(Exception):
+class NotAuthorized(JenkinsAPIException):
"""Not Authorized to access resource"""
# Usually thrown when we get a 403 returned
-class NotSupportSCM(Exception):
+class NotSupportSCM(JenkinsAPIException):
"""
It's a SCM that does not supported by current version of jenkinsapi
"""
-class NotConfiguredSCM(Exception):
+class NotConfiguredSCM(JenkinsAPIException):
"""
It's a job that doesn't have configured SCM
"""
-class NotInQueue(Exception):
+class NotInQueue(JenkinsAPIException):
"""
It's a job that is not in the queue
"""
from jenkinsapi.fingerprint import Fingerprint
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi.utils.requester import Requester
-from jenkinsapi.exceptions import UnknownJob, NotAuthorized
+from jenkinsapi.exceptions import UnknownJob, NotAuthorized, JenkinsAPIException
try:
import json
return Jenkins(self.baseurl, username=self.username, password=self.password, requester=self.requester)
def get_base_server_url(self):
- return self.baseurl[:-(len(config.JENKINS_API))]
-
- def get_krb_opener(self):
- if not mkkrbopener:
- raise NotImplementedError('JenkinsAPI was installed without Kerberos support.')
- return mkkrbopener(self.baseurl)
+ return self.baseurl[:-(len(config.JENKINS_API))]
def validate_fingerprint(self, id):
obj_fingerprint = Fingerprint(self.baseurl, id, jenkins_obj=self)
def get_jenkins_obj(self):
return self
+ def get_create_url(self):
+ # This only ever needs to work on the base object
+ return '%s/createItem' % self.baseurl
+
def get_jobs(self):
"""
Fetch all the build-names on this Jenkins server.
:param config: configuration of new job, xml
:return: new Job obj
"""
- headers = {'Content-Type': 'text/xml'}
+ try:
+ job = self[jobname]
+ raise JenkinsAPIException('Job %s already exists!' % jobname)
+ except KeyError:
+ pass
params = {'name': jobname}
- self.requester.hit_url(self.baseurl, data=config, params=params, headers=headers)
- newjk = self._clone()
- return newjk.get_job(jobname)
+ self.requester.post_xml_and_confirm_status(self.get_create_url(), data=config, params=params)
+ self.poll()
+ return self[jobname]
def copy_job(self, jobname, newjobname):
"""
:param newjobname: name of new job, str
:return: new Job obj
"""
- qs = urllib.urlencode({'name': newjobname,
- 'mode': 'copy',
- 'from': jobname})
- copy_job_url = urlparse.urljoin(self.baseurl, "createItem?%s" % qs)
- self.post_data(copy_job_url, '')
- newjk = self._clone()
- return newjk.get_job(newjobname)
+ params = { 'name': newjobname,
+ 'mode': 'copy',
+ 'from': jobname}
+
+ self.requester.post_and_confirm_status(
+ self.get_create_url(),
+ params=params,
+ data='')
+ self.poll()
+ return self[jobname]
def delete_job(self, jobname):
"""
:param jobname: name of a exist job, str
:return: new jenkins_obj
"""
- delete_job_url = urlparse.urljoin(self._clone().get_job(jobname).baseurl, "doDelete" )
- self.post_data(delete_job_url, '')
- newjk = self._clone()
- return newjk
+ delete_job_url = self[jobname].get_delete_url()
+ response = self.requester.post_and_confirm_status(
+ delete_job_url,
+ data='some random bytes...'
+ )
+ self.poll()
+ return self
def rename_job(self, jobname, newjobname):
"""
:param newjobname: name of new job, str
:return: new Job obj
"""
- qs = urllib.urlencode({'newName': newjobname})
- rename_job_url = urlparse.urljoin(self._clone().get_job(jobname).baseurl, "doRename?%s" % qs)
- self.post_data(rename_job_url, '')
- newjk = self._clone()
- return newjk.get_job(newjobname)
+ params = {'newName': newjobname}
+ rename_job_url = self[jobname].get_rename_url()
+ response = self.requester.post_and_confirm_status(
+ rename_job_url, params=params, data='')
+ self.poll()
+ return self[newjobname]
def iterkeys(self):
for info in self._data["jobs"]:
return View(str_view_url , str_view_name, jenkins_obj=self)
def delete_view_by_url(self, str_url):
- url = "%s/doDelete" %str_url
- self.post_data(url, '')
- self.poll()
+ url = "%s/doDelete" % str_url
+ response = self.requester.post_xml_and_confirm_status(self.url, params=params, data='')
+ self._poll
return self
def create_view(self, str_view_name, person=None):
result = self.hit_url(viewExistsCheck_url)
log.debug('result=%s' % result)
# Jenkins returns "<div/>" if view does not exist
- if len(result) > len('<div/>'):
+ if len(result) > len('<div/>'):
log.error('A view "%s" already exists' % (str_view_name))
return None
else:
result = self.hit_url(viewExistsCheck_url)
log.debug('result=%s' % result)
# Jenkins returns "<div/>" if view does not exist
- if len(result) == len('<div/>'):
+ if len(result) == len('<div/>'):
log.error('A view the name "%s" does not exist' % (str_view_name))
- return False
+ return False
else:
self.delete_view_by_url(urlparse.urljoin(url, 'view/%s' % str_view_name))
# We changed Jenkins config - need to update ourself
import logging
import pprint
from jenkinsapi import config
-from jenkinsapi.utils.retry import retry_function
log = logging.getLogger(__name__)
"""
This appears to be the base object that all other jenkins objects are inherited from
"""
- RETRY_ATTEMPTS = 5
+ RETRY_ATTEMPTS = 1
def __repr__(self):
return """<%s.%s %s>""" % (self.__class__.__module__,
"""
Initialize a jenkins connection
"""
- self.baseurl = baseurl
+ self.baseurl = self.strip_trailing_slash(baseurl)
if poll:
try:
self.poll()
return False
return True
+ @classmethod
+ def strip_trailing_slash(cls, url):
+ while url.endswith('/'):
+ url = url[:-1]
+ return url
+
def poll(self):
self._data = self._poll()
def _poll(self):
url = self.python_api_url(self.baseurl)
+
requester = self.get_jenkins_obj().requester
- content = retry_function(self.RETRY_ATTEMPTS , requester.hit_url, url)
+ response = requester.get_url(url)
try:
- return eval(content)
+ return eval(response.text)
except SyntaxError:
log.exception('Inappropriate content found at %s' % url)
+ raise
+ except TypeError:
+ raise
@classmethod
def python_api_url(cls, url):
fmt="%s%s"
else:
fmt = "%s/%s"
- return fmt % (url, config.JENKINS_API)
\ No newline at end of file
+ return fmt % (url, config.JENKINS_API)
from jenkinsapi.build import Build
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi import exceptions
+from jenkinsapi.mutable_jenkins_thing import MutableJenkinsThing
from exceptions import NoBuildData, NotFound, NotInQueue
log = logging.getLogger(__name__)
-class Job(JenkinsBase):
+class Job(JenkinsBase, MutableJenkinsThing):
"""
Represents a jenkins job
A job can hold N builds which are the actual execution environments
None : lambda element_tree: []
}
JenkinsBase.__init__( self, url )
-
+
def __str__(self):
return self._data["name"]
raise exceptions.NotSupportSCM("SCM class \"%s\" not supported by API, job \"%s\"" % (scm_class, self.name))
if scm == 'NullSCM':
raise exceptions.NotConfiguredSCM("SCM does not configured, job \"%s\"" % self.name)
- return scm
+ return scm
def get_scm_url(self):
"""
Get list of project SCM urls
- For some SCM's jenkins allow to configure and use number of SCM url's
+ For some SCM's jenkins allow to configure and use number of SCM url's
: return: list of SCM urls
"""
element_tree = self._get_config_element_tree()
def modify_scm_branch(self, new_branch, old_branch=None):
"""
Modify SCM ("Source Code Management") branch name for configured job.
- :param new_branch : new repository branch name to set.
- If job has multiple branches configured and "old_branch"
+ :param new_branch : new repository branch name to set.
+ If job has multiple branches configured and "old_branch"
not provided - method will allways modify first url.
- :param old_branch (optional): exact value of branch name to be replaced.
+ :param old_branch (optional): exact value of branch name to be replaced.
For some SCM's jenkins allow set multiple branches per job
this parameter intended to indicate which branch need to be modified
"""
def modify_scm_url(self, new_source_url, old_source_url=None):
"""
Modify SCM ("Source Code Management") url for configured job.
- :param new_source_url : new repository url to set.
- If job has multiple repositories configured and "old_source_url"
+ :param new_source_url : new repository url to set.
+ If job has multiple repositories configured and "old_source_url"
not provided - method will allways modify first url.
:param old_source_url (optional): for some SCM's jenkins allow set multiple repositories per job
this parameter intended to indicate which repository need to be modified
self.update_config(ET.tostring(element_tree))
else:
for scm_url in scm_url_list:
- if scm_url.text == old_source_url:
+ if scm_url.text == old_source_url:
scm_url.text = new_source_url
self.update_config(ET.tostring(element_tree))
--- /dev/null
+import urlparse
+
+class MutableJenkinsThing(object):
+ """
+ """
+ def get_delete_url(self):
+ return '%s/doDelete' % self.baseurl
+
+ def get_rename_url(self):
+ return '%s/doRename' % self.baseurl
--- /dev/null
+import logging
+from jenkinsapi.node import Node
+from jenkinsapi.jenkinsbase import JenkinsBase
+
+log = logging.getLogger(__name__)
+
+class Nodes(JenkinsBase):
+ """
+ Class to hold information on a collection of nodes
+ """
+
+ def __init__(self, baseurl, jenkins_obj):
+ """
+ Handy access to all of the nodes on your Jenkins server
+ """
+ self.jenkins = jenkins_obj
+ JenkinsBase.__init__(self, baseurl)
+
+ def __str__(self):
+ return 'Nodes @ %s' % self.baseurl
+
+ def iteritems(self):
+ for item in self._data['computer']:
+ nodename = item['displayName']
+ if nodename == 'master':
+ nodeurl = '%s/(%s)' % (self.baseurl, nodename)
+ else:
+ nodeurl = '%s/%s' % (self.baseurl, nodename)
+ yield item['displayName'], Node(nodeurl, nodename, self.jenkins)
+
+ def __getitem__(self, nodename):
+ for k, v in self.iteritems():
+ if k == nodename:
+ return v
import StringIO
import requests
+from jenkinsapi.exceptions import JenkinsAPIException
+
class Requester(object):
- """
- A class which carries out HTTP requests. You can replace this class with one of your own implementation if you require
- some other way to access Jenkins.
+ """
+ A class which carries out HTTP requests. You can replace this class with one of your own implementation if you require
+ some other way to access Jenkins.
+
+ This default class can handle simple authentication only.
+ """
+
+ STATUS_OK = 200
+
+ def __init__(self, username=None, password=None):
+ if username:
+ assert password, 'Cannot set a username without a password!'
+
+ self.username = username
+ self.password = password
+
+ def get_request_dict(self, url, params, data, headers):
+ requestKwargs = {}
+ if self.username:
+ requestKwargs['auth'] = (self.username, self.password)
+
+ if params:
+ assert isinstance(
+ params, dict), 'Params must be a dict, got %s' % repr(params)
+ requestKwargs['params'] = params
+
+ if headers:
+ assert isinstance(
+ headers, dict), 'headers must be a dict, got %s' % repr(headers)
+ requestKwargs['headers'] = headers
- This default class can handle simple authentication only.
- """
+ if not data==None:
+ # It may seem odd, but some Jenkins operations require posting
+ # an empty string.
+ requestKwargs['data'] = data
+ return requestKwargs
- def __init__(self, username=None, password=None):
- if username:
- assert password, 'Cannot set a username without a password!'
+ def get_url(self, url, params=None, headers=None):
+ requestKwargs = self.get_request_dict(url, params, None, headers)
+ return requests.get(url, **requestKwargs)
- self.username = None
- self.password = None
+ def post_url(self, url, params=None, data=None, headers=None):
+ requestKwargs = self.get_request_dict(url, params, data, headers)
+ return requests.post(url, **requestKwargs)
- def hit_url(self, url, params=None, data=None, headers=None):
- requestKwargs = {}
- if self.username:
- requestKwargs['auth'] = (self.username, self.password)
+ def post_xml_and_confirm_status(self, url, params=None, data=None):
+ headers = {'Content-Type': 'text/xml'}
+ return self.post_and_confirm_status(url, params, data, headers)
- if params:
- assert isinstance(params, dict), 'Params must be a dict, got %s' % repr(params)
- requestKwargs['params'] = params
+ def post_and_confirm_status(self, url, params=None, data=None, headers=None):
+ assert isinstance(data, str)
- if headers:
- assert isinstance(headers, dict), 'headers must be a dict, got %s' % repr(headers)
- requestKwargs['headers'] = headers
+ if not headers:
+ headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- if data:
- requestKwargs['data'] = data
- response = requests.post(url, **requestKwargs)
- else:
- response = requests.get(url, **requestKwargs)
+ response = self.post_url(url, params, data, headers)
+ if not response.status_code == self.STATUS_OK:
+ raise JenkinsAPIException('Operation failed. url={0}, data={1}, headers={2}, status={3}, text={4}'.format(
+ response.url, data, headers, response.status_code, response.text.encode('UTF-8')))
+ return response
- import ipdb; ipdb.set_trace()
- return response.text
+ def get_and_confirm_status(self, url, params=None, headers=None):
+ response = self.get_url(url, params, headers)
+ if not response.status_code == self.STATUS_OK:
+ raise JenkinsAPIException('Operation failed. url={0}, headers={1}, status={2}, text={3}'.format(
+ response.url, headers, response.status_code, response.text.encode('UTF-8')))
+ return response
-import logging
-import time
-import urllib2
-
-log = logging.getLogger( __name__ )
-
-IGNORE_EXCEPTIONS = [ AttributeError, KeyboardInterrupt ]
-
-DEFAULT_SLEEP_TIME = 1
-
-def retry_function( tries, fn, *args, **kwargs ):
- """
- Retry function - calls an unreliable function n times before giving up, if tries is exceeded
- and it still fails the most recent exception is raised.
- """
- assert isinstance( tries, int ), "Tries should be a non-zero positive integer"
- assert tries > 0, "Tries should be a non-zero positive integer"
- for attempt in range(0, tries):
- attemptno = attempt + 1
- if attemptno == tries:
- log.warn( "Last chance: #%i of %i" % ( attemptno, tries ) )
- elif tries > attempt > 0:
- log.warn( "Attempt #%i of %i" % ( attemptno, tries ) )
- try:
- result = fn( *args, **kwargs )
- if attempt > 0:
- log.info( "Result obtained after attempt %i" % attemptno )
- return result
- except urllib2.HTTPError, e:
- if e.code == 404:
- raise
- log.exception(e)
- except Exception, e:
- if type(e) in IGNORE_EXCEPTIONS:
- # Immediatly raise in some cases.
- raise
- try:
- fn_name = fn.__name__
- except AttributeError:
- fn_name = "Anonymous Function"
- log.exception(e)
- if attemptno == tries:
- log.error( "%s failed at attempt %i, give up." % ( fn_name , attemptno ) )
- raise
- log.warn( "%s failed at attempt %i, trying again." % ( fn_name , attemptno ) )
- time.sleep( DEFAULT_SLEEP_TIME )
+# DELETE ME!
'''
System tests for `jenkinsapi.jenkins` module.
'''
+import unittest
+from jenkinsapi_tests.test_utils.random_strings import random_string
from jenkinsapi_tests.systests.base import BaseSystemTest, EMPTY_JOB_CONFIG
class JobTests(BaseSystemTest):
def test_create_job(self):
- self.jenkins.create_job('whatever', EMPTY_JOB_CONFIG)
- self.assertJobIsPresent('whatever')
+ job_name = 'create_%s' % random_string()
+ self.jenkins.create_job(job_name, EMPTY_JOB_CONFIG)
+ self.assertJobIsPresent(job_name)
def test_get_jobs_list(self):
- self._create_job('job1')
- self._create_job('job2')
+ job1_name = 'first_%s' % random_string()
+ job2_name = 'second_%s' % random_string()
+
+ self._create_job(job1_name)
+ self._create_job(job2_name)
job_list = self.jenkins.get_jobs_list()
- self.assertEqual(['job1', 'job2'], job_list)
+ self.assertEqual([job1_name, job2_name], job_list)
def test_delete_job(self):
- self._create_job('job_to_delete')
- self.jenkins.delete_job('job_to_delete')
- self.assertJobIsAbsent('job_to_delete')
+ job1_name = 'delete_me_%s' % random_string()
+
+ self._create_job(job1_name)
+ self.jenkins.delete_job(job1_name)
+ self.assertJobIsAbsent(job1_name)
def test_rename_job(self):
- self._create_job('job_to_rename')
- self.jenkins.rename_job('job_to_rename', 'renamed_job')
- self.assertJobIsAbsent('job_to_rename')
- self.assertJobIsPresent('renamed_job')
+ job1_name = 'A__%s' % random_string()
+ job2_name = 'B__%s' % random_string()
+
+ self._create_job(job1_name)
+ self.jenkins.rename_job(job1_name, job2_name)
+ self.assertJobIsAbsent(job1_name)
+ self.assertJobIsPresent(job2_name)
def test_copy_job(self):
- self._create_job('template_job')
- self.jenkins.copy_job('template_job', 'copied_job')
- self.assertJobIsPresent('template_job')
- self.assertJobIsPresent('copied_job')
+ template_job_name = 'TPL%s' % random_string()
+ copied_job_name = 'CPY%s' % random_string()
+
+ self._create_job(template_job_name)
+ self.jenkins.copy_job(template_job_name, copied_job_name)
+ self.assertJobIsPresent(template_job_name)
+ self.assertJobIsPresent(copied_job_name)
class NodeTests(BaseSystemTest):
+ """
+ """
+
+ # def test_get_node_dict(self):
+ # self.assertEqual(self.jenkins.get_node_dict(), {
+ # 'master': 'http://localhost:8080/computer/master/api/python/'})
- def test_get_node_dict(self):
- self.assertEqual(self.jenkins.get_node_dict(), {
- 'master': 'http://localhost:8080/computer/master/api/python/'})
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import random
+import string
+
+def random_string(length=10):
+ return ''.join( random.choice(string.ascii_lowercase) for i in range(length) )
+
+if __name__ == '__main__':
+ print random_string()
--- /dev/null
+import SimpleHTTPServer
+import SocketServer
+import logging
+import cgi
+
+PORT = 8000
+
+class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
+
+ def do_GET(self):
+ logging.error(self.headers)
+ SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
+
+ def do_POST(self):
+ logging.error(self.headers)
+ form = cgi.FieldStorage(
+ fp=self.rfile,
+ headers=self.headers,
+ environ={'REQUEST_METHOD':'POST',
+ 'CONTENT_TYPE':self.headers['Content-Type'],
+ })
+ for item in form.list:
+ logging.error(item)
+ SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
+
+Handler = ServerHandler
+
+httpd = SocketServer.TCPServer(("", PORT), Handler)
+
+print "serving at port", PORT
+httpd.serve_forever()
from jenkinsapi.jenkins import Jenkins
+
class TestJenkins(unittest.TestCase):
- DATA = {}
+ DATA = {}
+
+ @mock.patch.object(Jenkins, '_poll')
+ def setUp(self, _poll):
+ _poll.return_value = self.DATA
+ self.J = Jenkins('http://localhost:8080',
+ username='foouser', password='foopassword')
+
+ @mock.patch.object(Jenkins, '_poll')
+ def test_clone(self, _poll):
+ _poll.return_value = self.DATA
+ JJ = self.J._clone()
+ self.assertNotEquals(id(JJ), id(self.J))
+ self.assertEquals(JJ, self.J)
+
+ def test_stored_passwords(self):
+ self.assertEquals(self.J.requester.password, 'foopassword')
+ self.assertEquals(self.J.requester.username, 'foouser')
+
+
+class TestJenkinsURLs(unittest.TestCase):
- @mock.patch.object(Jenkins, '_poll')
- def setUp(self, _poll):
- _poll.return_value = self.DATA
- self.J = Jenkins('http://localhost:8080', username='foouser', password='foopassword')
+ @mock.patch.object(Jenkins, '_poll')
+ def testNoSlash(self, _poll):
+ _poll.return_value = {}
+ J = Jenkins('http://localhost:8080',
+ username='foouser', password='foopassword')
+ self.assertEquals(
+ J.get_create_url(), 'http://localhost:8080/createItem')
- def testClone(self):
- JJ = self.J._clone()
- self.assertNotEquals(id(JJ), id(self.J))
- self.assertEquals(JJ, self.J)
+ @mock.patch.object(Jenkins, '_poll')
+ def testWithSlash(self, _poll):
+ _poll.return_value = {}
+ J = Jenkins('http://localhost:8080/',
+ username='foouser', password='foopassword')
+ self.assertEquals(
+ J.get_create_url(), 'http://localhost:8080/createItem')
if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
+ unittest.main()
# def __init__( self, url, name, jenkins_obj ):
self.J = mock.MagicMock() # Jenkins object
- self.j = Job('http://', 'foo', self.J)
+ self.j = Job('http://halob:8080/job/foo/', 'foo', self.J)
def testRepr(self):
# Can we produce a repr string for this object
with self.assertRaises(AttributeError):
self.j.id()
self.assertEquals(self.j.name, 'foo')
+
+ def test_special_urls(self):
+ self.assertEquals(self.j.baseurl, 'http://halob:8080/job/foo')
+
+ self.assertEquals(self.j.get_delete_url(), 'http://halob:8080/job/foo/doDelete')
+
+ self.assertEquals(self.j.get_rename_url(), 'http://halob:8080/job/foo/doRename')
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import mock
+import unittest
+import datetime
+
+from jenkinsapi.jenkins import Jenkins
+from jenkinsapi.nodes import Nodes
+from jenkinsapi.node import Node
+
+
+class TestNode(unittest.TestCase):
+
+ DATA0 = {'assignedLabels': [{}],
+ 'description': None,
+ 'jobs': [],
+ 'mode': 'NORMAL',
+ 'nodeDescription': 'the master Jenkins node',
+ 'nodeName': '',
+ 'numExecutors': 2,
+ 'overallLoad': {},
+ 'primaryView': {'name': 'All', 'url': 'http://halob:8080/'},
+ 'quietingDown': False,
+ 'slaveAgentPort': 0,
+ 'unlabeledLoad': {},
+ 'useCrumbs': False,
+ 'useSecurity': False,
+ 'views': [{'name': 'All', 'url': 'http://halob:8080/'},
+ {'name': 'FodFanFo', 'url': 'http://halob:8080/view/FodFanFo/'}]}
+
+
+ DATA1 = {'busyExecutors': 0,
+ 'computer': [{'actions': [],
+ 'displayName': 'master',
+ 'executors': [{}, {}],
+ 'icon': 'computer.png',
+ 'idle': True,
+ 'jnlpAgent': False,
+ 'launchSupported': True,
+ 'loadStatistics': {},
+ 'manualLaunchAllowed': True,
+ 'monitorData': {'hudson.node_monitors.ArchitectureMonitor': 'Linux (amd64)',
+ 'hudson.node_monitors.ClockMonitor': {'diff': 0},
+ 'hudson.node_monitors.DiskSpaceMonitor': {'path': '/var/lib/jenkins',
+ 'size': 671924924416},
+ 'hudson.node_monitors.ResponseTimeMonitor': {'average': 0},
+ 'hudson.node_monitors.SwapSpaceMonitor': {'availablePhysicalMemory': 3174686720,
+ 'availableSwapSpace': 17163087872,
+ 'totalPhysicalMemory': 16810180608,
+ 'totalSwapSpace': 17163087872},
+ 'hudson.node_monitors.TemporarySpaceMonitor': {'path': '/tmp',
+ 'size': 671924924416}},
+ 'numExecutors': 2,
+ 'offline': False,
+ 'offlineCause': None,
+ 'oneOffExecutors': [],
+ 'temporarilyOffline': False},
+ {'actions': [],
+ 'displayName': 'bobnit',
+ 'executors': [{}],
+ 'icon': 'computer-x.png',
+ 'idle': True,
+ 'jnlpAgent': False,
+ 'launchSupported': True,
+ 'loadStatistics': {},
+ 'manualLaunchAllowed': True,
+ 'monitorData': {'hudson.node_monitors.ArchitectureMonitor': 'Linux (amd64)',
+ 'hudson.node_monitors.ClockMonitor': {'diff': 4261},
+ 'hudson.node_monitors.DiskSpaceMonitor': {'path': '/home/sal/jenkins',
+ 'size': 169784860672},
+ 'hudson.node_monitors.ResponseTimeMonitor': {'average': 29},
+ 'hudson.node_monitors.SwapSpaceMonitor': {'availablePhysicalMemory': 4570710016,
+ 'availableSwapSpace': 12195983360,
+ 'totalPhysicalMemory': 8374497280,
+ 'totalSwapSpace': 12195983360},
+ 'hudson.node_monitors.TemporarySpaceMonitor': {'path': '/tmp',
+ 'size': 249737277440}},
+ 'numExecutors': 1,
+ 'offline': True,
+ 'offlineCause': {},
+ 'oneOffExecutors': [],
+ 'temporarilyOffline': False},
+ {'actions': [],
+ 'displayName': 'halob',
+ 'executors': [{}],
+ 'icon': 'computer-x.png',
+ 'idle': True,
+ 'jnlpAgent': True,
+ 'launchSupported': False,
+ 'loadStatistics': {},
+ 'manualLaunchAllowed': True,
+ 'monitorData': {'hudson.node_monitors.ArchitectureMonitor': None,
+ 'hudson.node_monitors.ClockMonitor': None,
+ 'hudson.node_monitors.DiskSpaceMonitor': None,
+ 'hudson.node_monitors.ResponseTimeMonitor': None,
+ 'hudson.node_monitors.SwapSpaceMonitor': None,
+ 'hudson.node_monitors.TemporarySpaceMonitor': None},
+ 'numExecutors': 1,
+ 'offline': True,
+ 'offlineCause': None,
+ 'oneOffExecutors': [],
+ 'temporarilyOffline': False}],
+ 'displayName': 'nodes',
+ 'totalExecutors': 2}
+
+ DATA2 = {'actions': [],
+ 'displayName': 'master',
+ 'executors': [{}, {}],
+ 'icon': 'computer.png',
+ 'idle': True,
+ 'jnlpAgent': False,
+ 'launchSupported': True,
+ 'loadStatistics': {},
+ 'manualLaunchAllowed': True,
+ 'monitorData': {'hudson.node_monitors.ArchitectureMonitor': 'Linux (amd64)',
+ 'hudson.node_monitors.ClockMonitor': {'diff': 0},
+ 'hudson.node_monitors.DiskSpaceMonitor': {'path': '/var/lib/jenkins',
+ 'size': 671942561792},
+ 'hudson.node_monitors.ResponseTimeMonitor': {'average': 0},
+ 'hudson.node_monitors.SwapSpaceMonitor': {'availablePhysicalMemory': 2989916160,
+ 'availableSwapSpace': 17163087872,
+ 'totalPhysicalMemory': 16810180608,
+ 'totalSwapSpace': 17163087872},
+ 'hudson.node_monitors.TemporarySpaceMonitor': {'path': '/tmp',
+ 'size': 671942561792}},
+ 'numExecutors': 2,
+ 'offline': False,
+ 'offlineCause': None,
+ 'oneOffExecutors': [],
+ 'temporarilyOffline': False}
+
+ DATA3= { 'actions': [],
+ 'displayName': 'halob',
+ 'executors': [{}],
+ 'icon': 'computer-x.png',
+ 'idle': True,
+ 'jnlpAgent': True,
+ 'launchSupported': False,
+ 'loadStatistics': {},
+ 'manualLaunchAllowed': True,
+ 'monitorData': {'hudson.node_monitors.ArchitectureMonitor': None,
+ 'hudson.node_monitors.ClockMonitor': None,
+ 'hudson.node_monitors.DiskSpaceMonitor': None,
+ 'hudson.node_monitors.ResponseTimeMonitor': None,
+ 'hudson.node_monitors.SwapSpaceMonitor': None,
+ 'hudson.node_monitors.TemporarySpaceMonitor': None},
+ 'numExecutors': 1,
+ 'offline': True,
+ 'offlineCause': None,
+ 'oneOffExecutors': [],
+ 'temporarilyOffline': False}
+
+
+ @mock.patch.object(Jenkins, '_poll')
+ @mock.patch.object(Nodes, '_poll')
+ def setUp(self, _poll_nodes, _poll_jenkins):
+ _poll_jenkins.return_value = self.DATA0
+ _poll_nodes.return_value = self.DATA1
+
+ # def __init__(self, baseurl, nodename, jenkins_obj):
+
+ self.J = Jenkins('http://localhost:8080')
+ self.ns = self.J.get_nodes()
+ #self.ns = Nodes('http://localhost:8080/computer', 'bobnit', self.J)
+
+ def testRepr(self):
+ # Can we produce a repr string for this object
+ repr(self.ns)
+
+ def testCheckURL(self):
+ self.assertEquals(self.ns.baseurl, 'http://localhost:8080/computer')
+
+ @mock.patch.object(Node, '_poll')
+ def testGetMasterNode(self, _poll_node):
+ _poll_node.return_value = self.DATA2
+ mn = self.ns['master']
+ self.assertIsInstance(mn, Node)
+
+ @mock.patch.object(Node, '_poll')
+ def testGetNonMasterNode(self, _poll_node):
+ _poll_node.return_value = self.DATA3
+ mn = self.ns['halob']
+ self.assertIsInstance(mn, Node)
+
+if __name__ == '__main__':
+ unittest.main()