+++ /dev/null
-import optparse\r
-import os\r
-import random\r
-import logging\r
-from utils.id import mk_id\r
-from utils import junitxml\r
-\r
-log = logging.getLogger(__name__)\r
-\r
-class meta_test(object):\r
- ATTEMPTS=3\r
-\r
- @classmethod\r
- def mkParser(cls):\r
- parser = optparse.OptionParser()\r
-\r
- def __init__(self, opts=None):\r
- self.opts = opts\r
-\r
- def testFunction(self):\r
- if random.random() < 0.1:\r
- raise AssertionError("The value was too small")\r
- return 0\r
-\r
- def __call__(self):\r
- temp_dir = os.environ.get("TEMP", r"c:\temp" )\r
- output_dir = os.environ.get( "WORKSPACE", temp_dir )\r
- result_filepath = os.path.join( output_dir, "results.xml" )\r
- stream = open( result_filepath, "wb" )\r
- testsuite_name = mk_id()\r
- ju = junitxml.junitxml( stream, testsuite_name)\r
-\r
-\r
- classname = mk_id()\r
- for i in xrange(0, self.ATTEMPTS ):\r
- tr = ju.startTest( classname, mk_id() )\r
- try:\r
- tr.run( self.testFunction )\r
- except Exception, e:\r
- log.exception(e)\r
- continue\r
-\r
- ju.write()\r
-\r
-def main( ):\r
- logging.basicConfig()\r
- return meta_test()()\r
-\r
-if __name__ == "__main__":\r
- main()\r
+++ /dev/null
-from cStringIO import StringIO\r
-\r
-class bufwrapper( object ):\r
- """\r
- Basic buffer-wrapper - wraps up an output stream with a buffer.\r
- """\r
- def __init__( self, stream, buffer=None ):\r
- self.stream = stream\r
-\r
- assert hasattr( self.stream, "write" ), "%s does not support write" % repr(stream)\r
-\r
- if buffer is None:\r
- self.buf = StringIO()\r
- else:\r
- self.buf = buffer\r
-\r
- def get_and_clear( self ):\r
- """\r
- Get the contents of the buffer and clear it.\r
- """\r
- old_buffer = self.buf\r
- self.buf = StringIO()\r
- return old_buffer.getvalue()\r
-\r
- def flush( self ):\r
- for item in [ self.stream, self.buf ]:\r
- if hasattr( item, "flush" ) and callable( item.flush ):\r
- item.flush()\r
-\r
-\r
- def close(self):\r
- self.stream.close()\r
-\r
- def write(self, txt ):\r
- self.stream.write(txt)\r
- self.buf.write(txt)\r
-\r
- def getvalue(self):\r
- return self.buf.getvalue()\r
+++ /dev/null
-import datetime\r
-\r
-MICROSECONDS_PER_SECOND = 1000000.0\r
-SECONDS_PER_DAY = 86400\r
-\r
-def timedelta_to_seconds( td ):\r
- assert isinstance( td, datetime.timedelta )\r
- seconds = 0.0\r
-\r
- seconds += td.days * SECONDS_PER_DAY\r
- seconds += td.seconds\r
- seconds += td.microseconds / MICROSECONDS_PER_SECOND\r
-\r
- return seconds\r
+++ /dev/null
-"""\r
-Generate random IDs.\r
-"""\r
-import random\r
-\r
-ID_VALID = "abcdefghijklmnopqrstuvwxyz0123456789"\r
-\r
-def mk_id(length=5, prefix=""):\r
- idchars = []\r
- for count in range( 0, length ):\r
- idchars.append( random.choice( ID_VALID ) )\r
- return "%s%s" % ( prefix, "".join( idchars ) )\r
-\r
-if __name__ == "__main__":\r
- for i in range(0, 50):\r
- print repr( mk_id( i ) )\r
+++ /dev/null
-import logging\r
-import datetime\r
-import traceback\r
-import sys\r
-\r
-try:\r
- from xml.etree import ElementTree as ET\r
-except Exception, e:\r
- import elementtree.ElementTree as ET\r
-\r
-from utils.dates import timedelta_to_seconds\r
-\r
-log = logging.getLogger(__name__)\r
-\r
-class junitxml( object ):\r
-\r
- ERROR = "error"\r
- FAILURE = "failure"\r
-\r
- def __init__( self, stream, testsuite_name="test", ):\r
- """\r
- Set up a new stream\r
- """\r
- assert isinstance( testsuite_name, str )\r
-\r
- self.xml = ET.Element("testsuite")\r
- self.stream = stream\r
-\r
- self.xml.attrib["name"] = testsuite_name\r
-\r
- self.count_errors = 0\r
- self.count_tests = 0\r
- self.count_failures = 0\r
-\r
- def __repr__(self):\r
- return "<%s.%s %s>" % (self.__class__.__module__, self.__class__.__name__, str(self))\r
-\r
- def __str__(self):\r
- return "Stream: %s, Tests: %i Errors: %i, Failures %i" % ( repr( self.stream ),\r
- self.count_tests,\r
- self.count_errors,\r
- self.count_failures )\r
-\r
- @classmethod\r
- def get_error_strings( cls, e ):\r
- str_error_type = "%s.%s" % ( e.__class__.__module__, e.__class__.__name__ )\r
- str_error_args = ",".join( [repr(ee) for ee in e.args] )\r
- str_doc = str( e.__doc__ ).strip()\r
-\r
- return str_error_type, str_error_args, str_doc\r
-\r
- def write(self, xml_declaration=True, encoding="utf-8"):\r
- self.xml.attrib["errors"] = str( self.count_errors )\r
- self.xml.attrib["failures"] = str( self.count_failures )\r
- self.xml.attrib["tests"] = str( self.count_tests )\r
-\r
- ET.ElementTree( self.xml ).write( self.stream, encoding=encoding, xml_declaration=xml_declaration )\r
- log.warn( "Wrote Junit-style XML log to %s" % self.stream )\r
-\r
- def assertTrue(self, classname, testname, errmsg, fn, *args, **kwargs ):\r
- """\r
- Map the interface onto an assert like statement.\r
- Also returns the value so that we can do useful things with the result\r
- """\r
-\r
- _testname = testname.replace( ".", "_") # Dots are not permitted in names'\r
-\r
- def assert_fn( ):\r
- if callable(fn):\r
- assert fn( *args, **kwargs ), errmsg\r
- else:\r
- assert len(args) == 0 and len(kwargs) == 0, "Object being tested is not callable and cannot have arguments."\r
- assert fn, "errmsg"\r
-\r
- tr = self.startTest(classname, _testname)\r
- return tr.run( assert_fn )\r
-\r
- def startTest( self, classname, testname, ):\r
- return junitxml_transaction( self, classname, testname )\r
-\r
- def passTest( self, classname, name, test_time ):\r
- self.addPass( classname, name, test_time)\r
-\r
- def failTest(self, classname, name, test_time, error, tb, mode=FAILURE ):\r
- """\r
- Add a error\r
- """\r
- str_error, str_error_msg, str_doc = self.get_error_strings( error )\r
- enhanced_tb = "%s: %s\n\n( %s )\n\n%s" % ( repr(error), str_error_msg, str_doc, tb )\r
- tc = self.addPass( classname, name, test_time)\r
- self.convertPassToFail( tc, str_error, enhanced_tb, mode=mode )\r
-\r
-\r
- def addPass(self, classname, name, test_time=0.0, ):\r
- """\r
- Add a pass\r
- """\r
- assert isinstance( classname, str )\r
- assert isinstance( name, str )\r
- assert isinstance( test_time, (int, float) )\r
- self.count_tests += 1\r
- testcase = ET.SubElement( self.xml, "testcase" )\r
- testcase.attrib["classname"] = classname\r
- testcase.attrib["name"] = name\r
- testcase.attrib["time"] = "%.2f" % test_time\r
-\r
- return testcase\r
-\r
- def convertPassToFail( self, tc, failtype="", tb="", mode=FAILURE ):\r
- """\r
- Add a failure\r
- """\r
- assert isinstance( failtype, str )\r
- assert isinstance( tb, str ), "Traceback should be a string, got %s" % repr(tb)\r
- assert mode in [ self.FAILURE, self.ERROR ]\r
-\r
- if mode == self.FAILURE:\r
- self.count_errors += 1\r
- else:\r
- self.count_failures += 1\r
-\r
- failure = ET.SubElement( tc, mode )\r
- failure.text = tb\r
- failure.attrib["type"] = failtype\r
- return failure\r
-\r
-\r
-class junitxml_transaction( object ):\r
- def __init__(self, jxml, classname, testname ):\r
- assert isinstance( jxml, junitxml )\r
- self.jxml = jxml\r
- self.classname = classname\r
- self.testname = testname\r
- self.start_time = datetime.datetime.now()\r
-\r
- def getRuntime(self):\r
- return timedelta_to_seconds( datetime.datetime.now() - self.start_time )\r
-\r
- def run( self, fn, *args, **kwargs ):\r
- try:\r
- result = fn( *args, **kwargs )\r
- self.jxml.addPass( self.classname, self.testname, self.getRuntime() )\r
- except Exception, e:\r
- ex_type, ex_value, ex_tb = sys.exc_info()\r
-\r
- tb_formatted = traceback.format_exception( ex_type, ex_value, ex_tb )\r
- str_tb = "\n".join( tb_formatted )\r
- str_ex = "%s.%s" % ( ex_value.__class__.__module__, ex_value.__class__.__name__ )\r
- runtime = self.getRuntime()\r
-\r
- if isinstance(e, AssertionError):\r
- self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.FAILURE )\r
- else:\r
- self.jxml.failTest( self.classname, self.testname, runtime, e, str_tb, mode=self.jxml.ERROR )\r
-\r
- log.exception(e)\r
-\r
- raise e\r
- return result\r
-\r
-if __name__ == "__main__":\r
- import sys\r
- import time\r
- import random\r
-\r
- logging.basicConfig()\r
- logging.getLogger("").setLevel( logging.INFO )\r
- fod = junitxml( stream=sys.stdout )\r
-\r
- def fn_test( mode ):\r
-\r
- time.sleep( random.random( ) )\r
-\r
- if mode=="pass":\r
- return 1\r
- elif mode=="fail":\r
- assert False\r
- elif mode=="error":\r
- {}["x"]\r
-\r
- for testname in [ "pass", "fail", "error" ]:\r
- t = fod.startTest("a", testname, )\r
- try:\r
- t.run( fn_test, testname )\r
- except Exception, e:\r
- #log.exception(e)\r
- pass\r
-\r
- fod.write()\r
+++ /dev/null
-"""
-XML Test Runner for PyUnit
-"""
-
-# Written by Sebastian Rittau <srittau@jroger.in-berlin.de> and placed in
-# the Public Domain.
-from utils import bufwrapper
-
-__revision__ = "$Id: /mirror/jroger/python/stdlib/xmlrunner.py 3506 2006-07-27T09:12:39.629878Z srittau $"
-
-import sys
-import time
-import traceback
-import unittest
-import logging
-from StringIO import StringIO
-from xml.sax.saxutils import escape
-
-log = logging.getLogger()
-
-class faketest( object ):
- """
- A fake test object for when you want to inject additional results into the XML stream.
- """
- failureException = AssertionError
-
- def __init__( self, id, exc_info ):
- self._id = id
- self._exc_info = exc_info
-
- def id(self):
- return self._id
-
- def run(self, result):
- result.startTest(self)
- result.addError(self, self._exc_info )
- ok = False
- result.stopTest(self)
-
- def __call__(self, *args, **kwds):
- return self.run(*args, **kwds)
-
-
-class _TestInfo(object):
- """Information about a particular test.
- Used by _XmlTestResult."""
-
- def __init__( self, test, time, ):
- (self._class, self._method) = test.id().rsplit(".", 1)
- self._time = time
- self._error = None
- self._failure = None
- self._console = ""
-
- @staticmethod
- def create_success(test, time):
- """Create a _TestInfo instance for a successful test."""
- return _TestInfo(test, time)
-
- @staticmethod
- def create_failure(test, time, failure, console=""):
- """Create a _TestInfo instance for a failed test."""
- info = _TestInfo(test, time)
- info._failure = failure
- info.console = console
- return info
-
- @staticmethod
- def create_error(test, time, error, console="" ):
- """Create a _TestInfo instance for an erroneous test."""
- info = _TestInfo(test, time)
- info._error = error
- info.console = console
- return info
-
- def print_report(self, stream):
- """Print information about this test case in XML format to the
- supplied stream.
- """
- stream.write(' <testcase classname="%(class)s" name="%(method)s" time="%(time).4f">' % \
- {
- "class": self._class,
- "method": self._method,
- "time": self._time,
- })
- if self._failure is not None:
- self._print_error(stream, 'failure', self._failure)
- if self._error is not None:
- self._print_error(stream, 'error', self._error)
- stream.write('</testcase>\n')
-
- def _print_error(self, stream, tagname, error):
- """Print information from a failure or error to the supplied stream."""
- text = escape(str(error[1]))
- stream.write('\n')
- stream.write(' <%s type="%s">%s\n%s\n' \
- % (tagname, str(error[0]), text, self.console ))
- tb_stream = StringIO()
- traceback.print_tb(error[2], None, tb_stream)
- stream.write(escape(tb_stream.getvalue()))
- stream.write(' </%s>\n' % tagname)
- stream.write(' ')
-
-
-class _XmlTestResult(unittest.TestResult):
- """A test result class that stores result as XML.
-
- Used by XmlTestRunner.
- """
-
- test_count = 0
-
- @classmethod
- def get_test_serial( cls ):
- cls.test_count += 1
- return cls.test_count
-
- def __init__(self, classname, consolestream =None ):
- unittest.TestResult.__init__(self)
- self._test_name = classname
- self._start_time = None
- self._tests = []
- self._error = None
- self._failure = None
- self._consolestream = consolestream
-
- def startTest(self, test):
- unittest.TestResult.startTest(self, test)
-
- sn = self.get_test_serial()
-
- log.info( "Test %i: %s" % ( sn, test.id() ) )
- self._error = None
- self._failure = None
- self._start_time = time.time()
-
- def stopTest(self, test, time_taken = None ):
- if time_taken is not None:
- time_taken = time.time() - self._start_time
-
- str_console = self._consolestream.get_and_clear()
-
- unittest.TestResult.stopTest(self, test)
- if self._error:
- info = _TestInfo.create_error(test, time_taken, self._error, console=str_console )
- log.error( "Error: %s" % test.id() )
- elif self._failure:
- info = _TestInfo.create_failure(test, time_taken, self._failure, console=str_console )
- log.error( "Fail: %s" % test.id() )
- else:
- info = _TestInfo.create_success(test, time_taken, )
- log.debug( "OK: %s" % test.id() )
- self._tests.append(info)
-
- def addError(self, test, err):
- log.warn( "Error: %s" % test.id() )
- unittest.TestResult.addError(self, test, err)
- self._error = err
-
- def addFailure(self, test, err):
- log.warn( "Failure: %s" % test.id() )
- unittest.TestResult.addFailure(self, test, err)
- self._failure = err
-
- def print_report(self, stream, time_taken, out, err):
- """Prints the XML report to the supplied stream.
-
- The time the tests took to perform as well as the captured standard
- output and standard error streams must be passed in.
- """
- stream.write('<testsuite errors="%(e)d" failures="%(f)d" ' % \
- { "e": len(self.errors), "f": len(self.failures) })
- stream.write('name="%(n)s" tests="%(t)d" time="%(time).3f">\n' % \
- {
- "n": self._test_name,
- "t": self.testsRun,
- "time": time_taken,
- })
- for info in self._tests:
- info.print_report(stream)
- stream.write(' <system-out><![CDATA[%s]]></system-out>\n' % out)
- stream.write(' <system-err><![CDATA[%s]]></system-err>\n' % err)
- stream.write('</testsuite>\n')
-
-
-class XmlTestRunner(object):
- """A test runner that stores results in XML format compatible with JUnit.
-
- XmlTestRunner(stream=None) -> XML test runner
-
- The XML file is written to the supplied stream. If stream is None, the
- results are stored in a file called TEST-<module>.<class>.xml in the
- current working directory (if not overridden with the path property),
- where <module> and <class> are the module and class name of the test class.
- """
- def __init__(self, stream=None ):
- self._stream = stream
-
- @staticmethod
- def get_test_class_name_from_testobj( obj_test ):
- class_ = obj_test.__class__
- classname = class_.__module__ + "." + class_.__name__
- return classname
-
-
- def run(self, test, result=None ):
- """Run the given test case or test suite."""
- classname = self.get_test_class_name_from_testobj( test )
- assert not self._stream is None
- stream = self._stream
-
- # TODO: Python 2.5: Use the with statement
- old_stdout = sys.stdout
- old_stderr = sys.stderr
- sys.stdout = bufwrapper( old_stdout )
- sys.stderr = bufwrapper( old_stderr )
-
- if result is None:
- result = _XmlTestResult( classname, consolestream = sys.stdout )
- else:
- log.debug("Using provided XML test result object.")
-
- start_time = time.time()
-
- try:
- test(result)
- try:
- out_s = sys.stdout.getvalue()
- except AttributeError:
- out_s = ""
- try:
- err_s = sys.stderr.getvalue()
- except AttributeError:
- err_s = ""
- finally:
- sys.stdout = old_stdout
- sys.stderr = old_stderr
-
- time_taken = time.time() - start_time
- result.print_report(stream, time_taken, out_s, err_s)
- if self._stream is None:
- stream.close()
-
- return result
+++ /dev/null
-import unittest\r
-import sys\r
-import re\r
-from cStringIO import StringIO\r
-from utils.xmlrunner import XmlTestRunner\r
-\r
-class XmlTestRunnerTest(unittest.TestCase):\r
- def setUp(self):\r
- self._stream = StringIO()\r
-\r
- def _try_test_run(self, test_class, expected):\r
- """Run the test suite against the supplied test class and compare the\r
- XML result against the expected XML string. Fail if the expected\r
- string doesn't match the actual string. All time attribute in the\r
- expected string should have the value "0.000". All error and failure\r
- messages are reduced to "Foobar".\r
- """\r
- runner = XmlTestRunner(self._stream)\r
- runner.run(unittest.makeSuite(test_class))\r
-\r
- got = self._stream.getvalue()\r
- # Replace all time="X.YYY" attributes by time="0.000" to enable a\r
- # simple string comparison.\r
- got = re.sub(r'time="\d+\.\d+"', 'time="0.000"', got)\r
- # Likewise, replace all failure and error messages by a simple "Foobar"\r
- # string.\r
- got = re.sub(r'(?s)<failure (.*?)>.*?</failure>', r'<failure \1>Foobar</failure>', got)\r
- got = re.sub(r'(?s)<error (.*?)>.*?</error>', r'<error \1>Foobar</error>', got)\r
-\r
- self.assertEqual(expected, got)\r
-\r
- def test_no_tests(self):\r
- """Regression test: Check whether a test run without any tests\r
- matches a previous run."""\r
- class TestTest(unittest.TestCase):\r
- pass\r
- self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="0" time="0.000">\r
- <system-out><![CDATA[]]></system-out>\r
- <system-err><![CDATA[]]></system-err>\r
-</testsuite>\r
-""")\r
-\r
- def test_success(self):\r
- """Regression test: Check whether a test run with a successful test\r
- matches a previous run."""\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- pass\r
- self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
- <testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>\r
- <system-out><![CDATA[]]></system-out>\r
- <system-err><![CDATA[]]></system-err>\r
-</testsuite>\r
-""")\r
-\r
- def test_failure(self):\r
- """Regression test: Check whether a test run with a failing test\r
- matches a previous run."""\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- self.assert_(False)\r
- self._try_test_run(TestTest, """<testsuite errors="0" failures="1" name="unittest.TestSuite" tests="1" time="0.000">\r
- <testcase classname="__main__.TestTest" name="test_foo" time="0.000">\r
- <failure type="exceptions.AssertionError">Foobar</failure>\r
- </testcase>\r
- <system-out><![CDATA[]]></system-out>\r
- <system-err><![CDATA[]]></system-err>\r
-</testsuite>\r
-""")\r
-\r
- def test_error(self):\r
- """Regression test: Check whether a test run with a erroneous test\r
- matches a previous run."""\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- raise IndexError()\r
- self._try_test_run(TestTest, """<testsuite errors="1" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
- <testcase classname="__main__.TestTest" name="test_foo" time="0.000">\r
- <error type="exceptions.IndexError">Foobar</error>\r
- </testcase>\r
- <system-out><![CDATA[]]></system-out>\r
- <system-err><![CDATA[]]></system-err>\r
-</testsuite>\r
-""")\r
-\r
- def test_stdout_capture(self):\r
- """Regression test: Check whether a test run with output to stdout\r
- matches a previous run."""\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- print "Test"\r
- self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
- <testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>\r
- <system-out><![CDATA[Test\r
-]]></system-out>\r
- <system-err><![CDATA[]]></system-err>\r
-</testsuite>\r
-""")\r
-\r
- def test_stderr_capture(self):\r
- """Regression test: Check whether a test run with output to stderr\r
- matches a previous run."""\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- print >>sys.stderr, "Test"\r
- self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">\r
- <testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>\r
- <system-out><![CDATA[]]></system-out>\r
- <system-err><![CDATA[Test\r
-]]></system-err>\r
-</testsuite>\r
-""")\r
-\r
- class NullStream(object):\r
- """A file-like object that discards everything written to it."""\r
- def write(self, buffer):\r
- pass\r
-\r
- def test_unittests_changing_stdout(self):\r
- """Check whether the XmlTestRunner recovers gracefully from unit tests\r
- that change stdout, but don't change it back properly.\r
- """\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- sys.stdout = XmlTestRunnerTest.NullStream()\r
-\r
- runner = XmlTestRunner(self._stream)\r
- runner.run(unittest.makeSuite(TestTest))\r
-\r
- def test_unittests_changing_stderr(self):\r
- """Check whether the XmlTestRunner recovers gracefully from unit tests\r
- that change stderr, but don't change it back properly.\r
- """\r
- class TestTest(unittest.TestCase):\r
- def test_foo(self):\r
- sys.stderr = XmlTestRunnerTest.NullStream()\r
-\r
- runner = XmlTestRunner(self._stream)\r
- runner.run(unittest.makeSuite(TestTest))\r
-\r
-\r
-if __name__ == "__main__":\r
- suite = unittest.makeSuite(XmlTestRunnerTest)\r
- unittest.TextTestRunner().run(suite)\r