--- /dev/null
+"""
+A lower-level implementation of copying a job in Jenkins
+"""
+
+import requests
+from jenkinsapi.jenkins import Jenkins
+from pkg_resources import resource_string
+from jenkinsapi_tests.test_utils.random_strings import random_string
+
+J = Jenkins('http://localhost:8080')
+jobName = random_string()
+jobName2 = '%s_2' % jobName
+
+url = 'http://localhost:8080/createItem?from=%s&name=%s&mode=copy' % (
+ jobName, jobName2)
+
+xml = resource_string('examples', 'addjob.xml')
+j = J.create_job(jobname=jobName, config=xml)
+
+
+h = {'Content-Type': 'application/x-www-form-urlencoded'}
+response = requests.post(url, data='dysjsjsjs', headers=h)
+print response.text.encode('UTF-8')
--- /dev/null
+'''
+System tests for `jenkinsapi.jenkins` module.
+'''
+import time
+import logging
+import unittest
+from jenkinsapi_tests.systests.base import BaseSystemTest
+
+log = logging.getLogger(__name__)
+
+JOB_CONFIGS = {
+'A':"""<?xml version='1.0' encoding='UTF-8'?>
+<project>
+ <actions/>
+ <description></description>
+ <keepDependencies>false</keepDependencies>
+ <properties/>
+ <scm class="hudson.scm.NullSCM"/>
+ <canRoam>true</canRoam>
+ <disabled>false</disabled>
+ <blockBuildWhenDownstreamBuilding>false</blockBuildWhenDownstreamBuilding>
+ <blockBuildWhenUpstreamBuilding>false</blockBuildWhenUpstreamBuilding>
+ <triggers class="vector"/>
+ <concurrentBuild>false</concurrentBuild>
+ <builders/>
+ <publishers>
+ <hudson.tasks.BuildTrigger>
+ <childProjects>B</childProjects>
+ <threshold>
+ <name>SUCCESS</name>
+ <ordinal>0</ordinal>
+ <color>BLUE</color>
+ </threshold>
+ </hudson.tasks.BuildTrigger>
+ </publishers>
+ <buildWrappers/>
+</project>""",
+
+'B':"""<?xml version='1.0' encoding='UTF-8'?>
+<project>
+ <actions/>
+ <description></description>
+ <keepDependencies>false</keepDependencies>
+ <properties/>
+ <scm class="hudson.scm.NullSCM"/>
+ <canRoam>true</canRoam>
+ <disabled>false</disabled>
+ <blockBuildWhenDownstreamBuilding>false</blockBuildWhenDownstreamBuilding>
+ <blockBuildWhenUpstreamBuilding>false</blockBuildWhenUpstreamBuilding>
+ <triggers class="vector"/>
+ <concurrentBuild>false</concurrentBuild>
+ <builders/>
+ <publishers>
+ <hudson.tasks.BuildTrigger>
+ <childProjects>C</childProjects>
+ <threshold>
+ <name>SUCCESS</name>
+ <ordinal>0</ordinal>
+ <color>BLUE</color>
+ </threshold>
+ </hudson.tasks.BuildTrigger>
+ </publishers>
+ <buildWrappers/>
+</project>""",
+
+'C':"""<?xml version='1.0' encoding='UTF-8'?>
+<project>
+ <actions/>
+ <description></description>
+ <keepDependencies>false</keepDependencies>
+ <properties/>
+ <scm class="hudson.scm.NullSCM"/>
+ <canRoam>true</canRoam>
+ <disabled>false</disabled>
+ <blockBuildWhenDownstreamBuilding>false</blockBuildWhenDownstreamBuilding>
+ <blockBuildWhenUpstreamBuilding>false</blockBuildWhenUpstreamBuilding>
+ <triggers class="vector"/>
+ <concurrentBuild>false</concurrentBuild>
+ <builders/>
+ <publishers/>
+ <buildWrappers/>
+</project>"""
+
+}
+
+class TestParameterizedBuilds(BaseSystemTest):
+ def test_invoke_job_parameterized(self):
+ for job_name, job_config in JOB_CONFIGS.items():
+ self.jenkins.create_job(job_name, job_config)
+
+ self.jenkins['A'].invoke()
+
+
+ for _ in range(0,10):
+ if not self.jenkins['C'].get_last_completed_buildnumber() > 0:
+ log.info('Waiting for the third test to complete')
+ time.sleep(2)
+ else:
+ break
+ else:
+ self.fail('Jenkins took too long to run these jobs')
+
+ self.assertTrue(self.jenkins['C'].get_upstream_jobs(), self.jenkins['B'])
+ self.assertTrue(self.jenkins['B'].get_upstream_jobs(), self.jenkins['A'])
+
+ self.assertTrue(self.jenkins['A'].get_downstream_jobs(), self.jenkins['B'])
+ self.assertTrue(self.jenkins['B'].get_downstream_jobs(), self.jenkins['C'])
+
+if __name__ == '__main__':
+ logging.basicConfig()
+ unittest.main()
'''
System tests for `jenkinsapi.jenkins` module.
'''
-import os
import time
-import shutil
-import random
-import tempfile
import unittest
from jenkinsapi_tests.test_utils.random_strings import random_string
from jenkinsapi_tests.systests.base import BaseSystemTest
class TestParameterizedBuilds(BaseSystemTest):
def test_invoke_job_parameterized(self):
-
param_B = random_string()
job_name = 'create_%s' % random_string()
artifacts = b.get_artifact_dict()
self.assertIsInstance(artifacts, dict)
-
artB = artifacts['b.txt']
-
self.assertTrue(artB.get_data().strip(), param_B)
- # TODO: Actually verify the download
-
-
if __name__ == '__main__':
unittest.main()
+++ /dev/null
-import SimpleHTTPServer
-import SocketServer
-import logging
-import cgi
-
-PORT = 8000
-
-class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
-
- def do_GET(self):
- logging.error(self.headers)
- SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
-
- def do_POST(self):
- logging.error(self.headers)
- form = cgi.FieldStorage(
- fp=self.rfile,
- headers=self.headers,
- environ={'REQUEST_METHOD':'POST',
- 'CONTENT_TYPE':self.headers['Content-Type'],
- })
- for item in form.list:
- logging.error(item)
- SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
-
-Handler = ServerHandler
-
-httpd = SocketServer.TCPServer(("", PORT), Handler)
-
-print "serving at port", PORT
-httpd.serve_forever()