6c2b2b2397487bbde39a712a24c67ea6dae75188
[profile/ivi/python.git] / Lib / test / test_support.py
1 """Supporting definitions for the Python regression tests."""
2
3 if __name__ != 'test.test_support':
4     raise ImportError('test_support must be imported from the test package')
5
6 import contextlib
7 import errno
8 import functools
9 import gc
10 import socket
11 import sys
12 import os
13 import platform
14 import shutil
15 import warnings
16 import unittest
17 import importlib
18 import UserDict
19 import re
20 import time
21 try:
22     import thread
23 except ImportError:
24     thread = None
25
26 __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
27            "verbose", "use_resources", "max_memuse", "record_original_stdout",
28            "get_original_stdout", "unload", "unlink", "rmtree", "forget",
29            "is_resource_enabled", "requires", "find_unused_port", "bind_port",
30            "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
31            "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
32            "open_urlresource", "check_warnings", "check_py3k_warnings",
33            "CleanImport", "EnvironmentVarGuard", "captured_output",
34            "captured_stdout", "TransientResource", "transient_internet",
35            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
36            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
37            "threading_cleanup", "reap_children", "cpython_only",
38            "check_impl_detail", "get_attribute", "py3k_bytes"]
39
40
41 class Error(Exception):
42     """Base class for regression test exceptions."""
43
44 class TestFailed(Error):
45     """Test failed."""
46
47 class ResourceDenied(unittest.SkipTest):
48     """Test skipped because it requested a disallowed resource.
49
50     This is raised when a test calls requires() for a resource that
51     has not been enabled.  It is used to distinguish between expected
52     and unexpected skips.
53     """
54
55 @contextlib.contextmanager
56 def _ignore_deprecated_imports(ignore=True):
57     """Context manager to suppress package and module deprecation
58     warnings when importing them.
59
60     If ignore is False, this context manager has no effect."""
61     if ignore:
62         with warnings.catch_warnings():
63             warnings.filterwarnings("ignore", ".+ (module|package)",
64                                     DeprecationWarning)
65             yield
66     else:
67         yield
68
69
70 def import_module(name, deprecated=False):
71     """Import and return the module to be tested, raising SkipTest if
72     it is not available.
73
74     If deprecated is True, any module or package deprecation messages
75     will be suppressed."""
76     with _ignore_deprecated_imports(deprecated):
77         try:
78             return importlib.import_module(name)
79         except ImportError, msg:
80             raise unittest.SkipTest(str(msg))
81
82
83 def _save_and_remove_module(name, orig_modules):
84     """Helper function to save and remove a module from sys.modules
85
86        Return value is True if the module was in sys.modules and
87        False otherwise."""
88     saved = True
89     try:
90         orig_modules[name] = sys.modules[name]
91     except KeyError:
92         saved = False
93     else:
94         del sys.modules[name]
95     return saved
96
97
98 def _save_and_block_module(name, orig_modules):
99     """Helper function to save and block a module in sys.modules
100
101        Return value is True if the module was in sys.modules and
102        False otherwise."""
103     saved = True
104     try:
105         orig_modules[name] = sys.modules[name]
106     except KeyError:
107         saved = False
108     sys.modules[name] = None
109     return saved
110
111
112 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
113     """Imports and returns a module, deliberately bypassing the sys.modules cache
114     and importing a fresh copy of the module. Once the import is complete,
115     the sys.modules cache is restored to its original state.
116
117     Modules named in fresh are also imported anew if needed by the import.
118
119     Importing of modules named in blocked is prevented while the fresh import
120     takes place.
121
122     If deprecated is True, any module or package deprecation messages
123     will be suppressed."""
124     # NOTE: test_heapq and test_warnings include extra sanity checks to make
125     # sure that this utility function is working as expected
126     with _ignore_deprecated_imports(deprecated):
127         # Keep track of modules saved for later restoration as well
128         # as those which just need a blocking entry removed
129         orig_modules = {}
130         names_to_remove = []
131         _save_and_remove_module(name, orig_modules)
132         try:
133             for fresh_name in fresh:
134                 _save_and_remove_module(fresh_name, orig_modules)
135             for blocked_name in blocked:
136                 if not _save_and_block_module(blocked_name, orig_modules):
137                     names_to_remove.append(blocked_name)
138             fresh_module = importlib.import_module(name)
139         finally:
140             for orig_name, module in orig_modules.items():
141                 sys.modules[orig_name] = module
142             for name_to_remove in names_to_remove:
143                 del sys.modules[name_to_remove]
144         return fresh_module
145
146
147 def get_attribute(obj, name):
148     """Get an attribute, raising SkipTest if AttributeError is raised."""
149     try:
150         attribute = getattr(obj, name)
151     except AttributeError:
152         raise unittest.SkipTest("module %s has no attribute %s" % (
153             obj.__name__, name))
154     else:
155         return attribute
156
157
158 verbose = 1              # Flag set to 0 by regrtest.py
159 use_resources = None     # Flag set to [] by regrtest.py
160 max_memuse = 0           # Disable bigmem tests (they will still be run with
161                          # small sizes, to make sure they work.)
162 real_max_memuse = 0
163
164 # _original_stdout is meant to hold stdout at the time regrtest began.
165 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
166 # The point is to have some flavor of stdout the user can actually see.
167 _original_stdout = None
168 def record_original_stdout(stdout):
169     global _original_stdout
170     _original_stdout = stdout
171
172 def get_original_stdout():
173     return _original_stdout or sys.stdout
174
175 def unload(name):
176     try:
177         del sys.modules[name]
178     except KeyError:
179         pass
180
181 def unlink(filename):
182     try:
183         os.unlink(filename)
184     except OSError:
185         pass
186
187 def rmtree(path):
188     try:
189         shutil.rmtree(path)
190     except OSError, e:
191         # Unix returns ENOENT, Windows returns ESRCH.
192         if e.errno not in (errno.ENOENT, errno.ESRCH):
193             raise
194
195 def forget(modname):
196     '''"Forget" a module was ever imported by removing it from sys.modules and
197     deleting any .pyc and .pyo files.'''
198     unload(modname)
199     for dirname in sys.path:
200         unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
201         # Deleting the .pyo file cannot be within the 'try' for the .pyc since
202         # the chance exists that there is no .pyc (and thus the 'try' statement
203         # is exited) but there is a .pyo file.
204         unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
205
206 def is_resource_enabled(resource):
207     """Test whether a resource is enabled.  Known resources are set by
208     regrtest.py."""
209     return use_resources is not None and resource in use_resources
210
211 def requires(resource, msg=None):
212     """Raise ResourceDenied if the specified resource is not available.
213
214     If the caller's module is __main__ then automatically return True.  The
215     possibility of False being returned occurs when regrtest.py is executing."""
216     # see if the caller's module is __main__ - if so, treat as if
217     # the resource was set
218     if sys._getframe(1).f_globals.get("__name__") == "__main__":
219         return
220     if not is_resource_enabled(resource):
221         if msg is None:
222             msg = "Use of the `%s' resource not enabled" % resource
223         raise ResourceDenied(msg)
224
225 HOST = 'localhost'
226
227 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
228     """Returns an unused port that should be suitable for binding.  This is
229     achieved by creating a temporary socket with the same family and type as
230     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
231     the specified host address (defaults to 0.0.0.0) with the port set to 0,
232     eliciting an unused ephemeral port from the OS.  The temporary socket is
233     then closed and deleted, and the ephemeral port is returned.
234
235     Either this method or bind_port() should be used for any tests where a
236     server socket needs to be bound to a particular port for the duration of
237     the test.  Which one to use depends on whether the calling code is creating
238     a python socket, or if an unused port needs to be provided in a constructor
239     or passed to an external program (i.e. the -accept argument to openssl's
240     s_server mode).  Always prefer bind_port() over find_unused_port() where
241     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
242     socket is bound to a hard coded port, the ability to run multiple instances
243     of the test simultaneously on the same host is compromised, which makes the
244     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
245     may simply manifest as a failed test, which can be recovered from without
246     intervention in most cases, but on Windows, the entire python process can
247     completely and utterly wedge, requiring someone to log in to the buildbot
248     and manually kill the affected process.
249
250     (This is easy to reproduce on Windows, unfortunately, and can be traced to
251     the SO_REUSEADDR socket option having different semantics on Windows versus
252     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
253     listen and then accept connections on identical host/ports.  An EADDRINUSE
254     socket.error will be raised at some point (depending on the platform and
255     the order bind and listen were called on each socket).
256
257     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
258     will ever be raised when attempting to bind two identical host/ports. When
259     accept() is called on each socket, the second caller's process will steal
260     the port from the first caller, leaving them both in an awkwardly wedged
261     state where they'll no longer respond to any signals or graceful kills, and
262     must be forcibly killed via OpenProcess()/TerminateProcess().
263
264     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
265     instead of SO_REUSEADDR, which effectively affords the same semantics as
266     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
267     Source world compared to Windows ones, this is a common mistake.  A quick
268     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
269     openssl.exe is called with the 's_server' option, for example. See
270     http://bugs.python.org/issue2550 for more info.  The following site also
271     has a very thorough description about the implications of both REUSEADDR
272     and EXCLUSIVEADDRUSE on Windows:
273     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
274
275     XXX: although this approach is a vast improvement on previous attempts to
276     elicit unused ports, it rests heavily on the assumption that the ephemeral
277     port returned to us by the OS won't immediately be dished back out to some
278     other process when we close and delete our temporary socket but before our
279     calling code has a chance to bind the returned port.  We can deal with this
280     issue if/when we come across it."""
281     tempsock = socket.socket(family, socktype)
282     port = bind_port(tempsock)
283     tempsock.close()
284     del tempsock
285     return port
286
287 def bind_port(sock, host=HOST):
288     """Bind the socket to a free port and return the port number.  Relies on
289     ephemeral ports in order to ensure we are using an unbound port.  This is
290     important as many tests may be running simultaneously, especially in a
291     buildbot environment.  This method raises an exception if the sock.family
292     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
293     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
294     for TCP/IP sockets.  The only case for setting these options is testing
295     multicasting via multiple UDP sockets.
296
297     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
298     on Windows), it will be set on the socket.  This will prevent anyone else
299     from bind()'ing to our host/port for the duration of the test.
300     """
301     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
302         if hasattr(socket, 'SO_REUSEADDR'):
303             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
304                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
305                                  "socket option on TCP/IP sockets!")
306         if hasattr(socket, 'SO_REUSEPORT'):
307             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
308                 raise TestFailed("tests should never set the SO_REUSEPORT "   \
309                                  "socket option on TCP/IP sockets!")
310         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
311             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
312
313     sock.bind((host, 0))
314     port = sock.getsockname()[1]
315     return port
316
317 FUZZ = 1e-6
318
319 def fcmp(x, y): # fuzzy comparison function
320     if isinstance(x, float) or isinstance(y, float):
321         try:
322             fuzz = (abs(x) + abs(y)) * FUZZ
323             if abs(x-y) <= fuzz:
324                 return 0
325         except:
326             pass
327     elif type(x) == type(y) and isinstance(x, (tuple, list)):
328         for i in range(min(len(x), len(y))):
329             outcome = fcmp(x[i], y[i])
330             if outcome != 0:
331                 return outcome
332         return (len(x) > len(y)) - (len(x) < len(y))
333     return (x > y) - (x < y)
334
335 try:
336     unicode
337     have_unicode = True
338 except NameError:
339     have_unicode = False
340
341 is_jython = sys.platform.startswith('java')
342
343 # Filename used for testing
344 if os.name == 'java':
345     # Jython disallows @ in module names
346     TESTFN = '$test'
347 elif os.name == 'riscos':
348     TESTFN = 'testfile'
349 else:
350     TESTFN = '@test'
351     # Unicode name only used if TEST_FN_ENCODING exists for the platform.
352     if have_unicode:
353         # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
354         # TESTFN_UNICODE is a filename that can be encoded using the
355         # file system encoding, but *not* with the default (ascii) encoding
356         if isinstance('', unicode):
357             # python -U
358             # XXX perhaps unicode() should accept Unicode strings?
359             TESTFN_UNICODE = "@test-\xe0\xf2"
360         else:
361             # 2 latin characters.
362             TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
363         TESTFN_ENCODING = sys.getfilesystemencoding()
364         # TESTFN_UNENCODABLE is a filename that should *not* be
365         # able to be encoded by *either* the default or filesystem encoding.
366         # This test really only makes sense on Windows NT platforms
367         # which have special Unicode support in posixmodule.
368         if (not hasattr(sys, "getwindowsversion") or
369                 sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
370             TESTFN_UNENCODABLE = None
371         else:
372             # Japanese characters (I think - from bug 846133)
373             TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
374             try:
375                 # XXX - Note - should be using TESTFN_ENCODING here - but for
376                 # Windows, "mbcs" currently always operates as if in
377                 # errors=ignore' mode - hence we get '?' characters rather than
378                 # the exception.  'Latin1' operates as we expect - ie, fails.
379                 # See [ 850997 ] mbcs encoding ignores errors
380                 TESTFN_UNENCODABLE.encode("Latin1")
381             except UnicodeEncodeError:
382                 pass
383             else:
384                 print \
385                 'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
386                 'Unicode filename tests may not be effective' \
387                 % TESTFN_UNENCODABLE
388
389
390 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
391 # module name.
392 TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
393
394 # Save the initial cwd
395 SAVEDCWD = os.getcwd()
396
397 @contextlib.contextmanager
398 def temp_cwd(name='tempcwd', quiet=False):
399     """
400     Context manager that creates a temporary directory and set it as CWD.
401
402     The new CWD is created in the current directory and it's named *name*.
403     If *quiet* is False (default) and it's not possible to create or change
404     the CWD, an error is raised.  If it's True, only a warning is raised
405     and the original CWD is used.
406     """
407     if isinstance(name, unicode):
408         try:
409             name = name.encode(sys.getfilesystemencoding() or 'ascii')
410         except UnicodeEncodeError:
411             if not quiet:
412                 raise unittest.SkipTest('unable to encode the cwd name with '
413                                         'the filesystem encoding.')
414     saved_dir = os.getcwd()
415     is_temporary = False
416     try:
417         os.mkdir(name)
418         os.chdir(name)
419         is_temporary = True
420     except OSError:
421         if not quiet:
422             raise
423         warnings.warn('tests may fail, unable to change the CWD to ' + name,
424                       RuntimeWarning, stacklevel=3)
425     try:
426         yield os.getcwd()
427     finally:
428         os.chdir(saved_dir)
429         if is_temporary:
430             rmtree(name)
431
432
433 def findfile(file, here=__file__, subdir=None):
434     """Try to find a file on sys.path and the working directory.  If it is not
435     found the argument passed to the function is returned (this does not
436     necessarily signal failure; could still be the legitimate path)."""
437     if os.path.isabs(file):
438         return file
439     if subdir is not None:
440         file = os.path.join(subdir, file)
441     path = sys.path
442     path = [os.path.dirname(here)] + path
443     for dn in path:
444         fn = os.path.join(dn, file)
445         if os.path.exists(fn): return fn
446     return file
447
448 def sortdict(dict):
449     "Like repr(dict), but in sorted order."
450     items = dict.items()
451     items.sort()
452     reprpairs = ["%r: %r" % pair for pair in items]
453     withcommas = ", ".join(reprpairs)
454     return "{%s}" % withcommas
455
456 def make_bad_fd():
457     """
458     Create an invalid file descriptor by opening and closing a file and return
459     its fd.
460     """
461     file = open(TESTFN, "wb")
462     try:
463         return file.fileno()
464     finally:
465         file.close()
466         unlink(TESTFN)
467
468 def check_syntax_error(testcase, statement):
469     testcase.assertRaises(SyntaxError, compile, statement,
470                           '<test string>', 'exec')
471
472 def open_urlresource(url, check=None):
473     import urlparse, urllib2
474
475     filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
476
477     fn = os.path.join(os.path.dirname(__file__), "data", filename)
478
479     def check_valid_file(fn):
480         f = open(fn)
481         if check is None:
482             return f
483         elif check(f):
484             f.seek(0)
485             return f
486         f.close()
487
488     if os.path.exists(fn):
489         f = check_valid_file(fn)
490         if f is not None:
491             return f
492         unlink(fn)
493
494     # Verify the requirement before downloading the file
495     requires('urlfetch')
496
497     print >> get_original_stdout(), '\tfetching %s ...' % url
498     f = urllib2.urlopen(url, timeout=15)
499     try:
500         with open(fn, "wb") as out:
501             s = f.read()
502             while s:
503                 out.write(s)
504                 s = f.read()
505     finally:
506         f.close()
507
508     f = check_valid_file(fn)
509     if f is not None:
510         return f
511     raise TestFailed('invalid resource "%s"' % fn)
512
513
514 class WarningsRecorder(object):
515     """Convenience wrapper for the warnings list returned on
516        entry to the warnings.catch_warnings() context manager.
517     """
518     def __init__(self, warnings_list):
519         self._warnings = warnings_list
520         self._last = 0
521
522     def __getattr__(self, attr):
523         if len(self._warnings) > self._last:
524             return getattr(self._warnings[-1], attr)
525         elif attr in warnings.WarningMessage._WARNING_DETAILS:
526             return None
527         raise AttributeError("%r has no attribute %r" % (self, attr))
528
529     @property
530     def warnings(self):
531         return self._warnings[self._last:]
532
533     def reset(self):
534         self._last = len(self._warnings)
535
536
537 def _filterwarnings(filters, quiet=False):
538     """Catch the warnings, then check if all the expected
539     warnings have been raised and re-raise unexpected warnings.
540     If 'quiet' is True, only re-raise the unexpected warnings.
541     """
542     # Clear the warning registry of the calling module
543     # in order to re-raise the warnings.
544     frame = sys._getframe(2)
545     registry = frame.f_globals.get('__warningregistry__')
546     if registry:
547         registry.clear()
548     with warnings.catch_warnings(record=True) as w:
549         # Set filter "always" to record all warnings.  Because
550         # test_warnings swap the module, we need to look up in
551         # the sys.modules dictionary.
552         sys.modules['warnings'].simplefilter("always")
553         yield WarningsRecorder(w)
554     # Filter the recorded warnings
555     reraise = [warning.message for warning in w]
556     missing = []
557     for msg, cat in filters:
558         seen = False
559         for exc in reraise[:]:
560             message = str(exc)
561             # Filter out the matching messages
562             if (re.match(msg, message, re.I) and
563                 issubclass(exc.__class__, cat)):
564                 seen = True
565                 reraise.remove(exc)
566         if not seen and not quiet:
567             # This filter caught nothing
568             missing.append((msg, cat.__name__))
569     if reraise:
570         raise AssertionError("unhandled warning %r" % reraise[0])
571     if missing:
572         raise AssertionError("filter (%r, %s) did not catch any warning" %
573                              missing[0])
574
575
576 @contextlib.contextmanager
577 def check_warnings(*filters, **kwargs):
578     """Context manager to silence warnings.
579
580     Accept 2-tuples as positional arguments:
581         ("message regexp", WarningCategory)
582
583     Optional argument:
584      - if 'quiet' is True, it does not fail if a filter catches nothing
585         (default True without argument,
586          default False if some filters are defined)
587
588     Without argument, it defaults to:
589         check_warnings(("", Warning), quiet=True)
590     """
591     quiet = kwargs.get('quiet')
592     if not filters:
593         filters = (("", Warning),)
594         # Preserve backward compatibility
595         if quiet is None:
596             quiet = True
597     return _filterwarnings(filters, quiet)
598
599
600 @contextlib.contextmanager
601 def check_py3k_warnings(*filters, **kwargs):
602     """Context manager to silence py3k warnings.
603
604     Accept 2-tuples as positional arguments:
605         ("message regexp", WarningCategory)
606
607     Optional argument:
608      - if 'quiet' is True, it does not fail if a filter catches nothing
609         (default False)
610
611     Without argument, it defaults to:
612         check_py3k_warnings(("", DeprecationWarning), quiet=False)
613     """
614     if sys.py3kwarning:
615         if not filters:
616             filters = (("", DeprecationWarning),)
617     else:
618         # It should not raise any py3k warning
619         filters = ()
620     return _filterwarnings(filters, kwargs.get('quiet'))
621
622
623 class CleanImport(object):
624     """Context manager to force import to return a new module reference.
625
626     This is useful for testing module-level behaviours, such as
627     the emission of a DeprecationWarning on import.
628
629     Use like this:
630
631         with CleanImport("foo"):
632             importlib.import_module("foo") # new reference
633     """
634
635     def __init__(self, *module_names):
636         self.original_modules = sys.modules.copy()
637         for module_name in module_names:
638             if module_name in sys.modules:
639                 module = sys.modules[module_name]
640                 # It is possible that module_name is just an alias for
641                 # another module (e.g. stub for modules renamed in 3.x).
642                 # In that case, we also need delete the real module to clear
643                 # the import cache.
644                 if module.__name__ != module_name:
645                     del sys.modules[module.__name__]
646                 del sys.modules[module_name]
647
648     def __enter__(self):
649         return self
650
651     def __exit__(self, *ignore_exc):
652         sys.modules.update(self.original_modules)
653
654
655 class EnvironmentVarGuard(UserDict.DictMixin):
656
657     """Class to help protect the environment variable properly.  Can be used as
658     a context manager."""
659
660     def __init__(self):
661         self._environ = os.environ
662         self._changed = {}
663
664     def __getitem__(self, envvar):
665         return self._environ[envvar]
666
667     def __setitem__(self, envvar, value):
668         # Remember the initial value on the first access
669         if envvar not in self._changed:
670             self._changed[envvar] = self._environ.get(envvar)
671         self._environ[envvar] = value
672
673     def __delitem__(self, envvar):
674         # Remember the initial value on the first access
675         if envvar not in self._changed:
676             self._changed[envvar] = self._environ.get(envvar)
677         if envvar in self._environ:
678             del self._environ[envvar]
679
680     def keys(self):
681         return self._environ.keys()
682
683     def set(self, envvar, value):
684         self[envvar] = value
685
686     def unset(self, envvar):
687         del self[envvar]
688
689     def __enter__(self):
690         return self
691
692     def __exit__(self, *ignore_exc):
693         for (k, v) in self._changed.items():
694             if v is None:
695                 if k in self._environ:
696                     del self._environ[k]
697             else:
698                 self._environ[k] = v
699         os.environ = self._environ
700
701
702 class DirsOnSysPath(object):
703     """Context manager to temporarily add directories to sys.path.
704
705     This makes a copy of sys.path, appends any directories given
706     as positional arguments, then reverts sys.path to the copied
707     settings when the context ends.
708
709     Note that *all* sys.path modifications in the body of the
710     context manager, including replacement of the object,
711     will be reverted at the end of the block.
712     """
713
714     def __init__(self, *paths):
715         self.original_value = sys.path[:]
716         self.original_object = sys.path
717         sys.path.extend(paths)
718
719     def __enter__(self):
720         return self
721
722     def __exit__(self, *ignore_exc):
723         sys.path = self.original_object
724         sys.path[:] = self.original_value
725
726
727 class TransientResource(object):
728
729     """Raise ResourceDenied if an exception is raised while the context manager
730     is in effect that matches the specified exception and attributes."""
731
732     def __init__(self, exc, **kwargs):
733         self.exc = exc
734         self.attrs = kwargs
735
736     def __enter__(self):
737         return self
738
739     def __exit__(self, type_=None, value=None, traceback=None):
740         """If type_ is a subclass of self.exc and value has attributes matching
741         self.attrs, raise ResourceDenied.  Otherwise let the exception
742         propagate (if any)."""
743         if type_ is not None and issubclass(self.exc, type_):
744             for attr, attr_value in self.attrs.iteritems():
745                 if not hasattr(value, attr):
746                     break
747                 if getattr(value, attr) != attr_value:
748                     break
749             else:
750                 raise ResourceDenied("an optional resource is not available")
751
752
753 @contextlib.contextmanager
754 def transient_internet(resource_name, timeout=30.0, errnos=()):
755     """Return a context manager that raises ResourceDenied when various issues
756     with the Internet connection manifest themselves as exceptions."""
757     default_errnos = [
758         ('ECONNREFUSED', 111),
759         ('ECONNRESET', 104),
760         ('ENETUNREACH', 101),
761         ('ETIMEDOUT', 110),
762     ]
763     default_gai_errnos = [
764         ('EAI_NONAME', -2),
765         ('EAI_NODATA', -5),
766     ]
767
768     denied = ResourceDenied("Resource '%s' is not available" % resource_name)
769     captured_errnos = errnos
770     gai_errnos = []
771     if not captured_errnos:
772         captured_errnos = [getattr(errno, name, num)
773                            for (name, num) in default_errnos]
774         gai_errnos = [getattr(socket, name, num)
775                       for (name, num) in default_gai_errnos]
776
777     def filter_error(err):
778         n = getattr(err, 'errno', None)
779         if (isinstance(err, socket.timeout) or
780             (isinstance(err, socket.gaierror) and n in gai_errnos) or
781             n in captured_errnos):
782             if not verbose:
783                 sys.stderr.write(denied.args[0] + "\n")
784             raise denied
785
786     old_timeout = socket.getdefaulttimeout()
787     try:
788         if timeout is not None:
789             socket.setdefaulttimeout(timeout)
790         yield
791     except IOError as err:
792         # urllib can wrap original socket errors multiple times (!), we must
793         # unwrap to get at the original error.
794         while True:
795             a = err.args
796             if len(a) >= 1 and isinstance(a[0], IOError):
797                 err = a[0]
798             # The error can also be wrapped as args[1]:
799             #    except socket.error as msg:
800             #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
801             elif len(a) >= 2 and isinstance(a[1], IOError):
802                 err = a[1]
803             else:
804                 break
805         filter_error(err)
806         raise
807     # XXX should we catch generic exceptions and look for their
808     # __cause__ or __context__?
809     finally:
810         socket.setdefaulttimeout(old_timeout)
811
812
813 @contextlib.contextmanager
814 def captured_output(stream_name):
815     """Run the 'with' statement body using a StringIO object in place of a
816     specific attribute on the sys module.
817     Example use (with 'stream_name=stdout')::
818
819        with captured_stdout() as s:
820            print "hello"
821        assert s.getvalue() == "hello"
822     """
823     import StringIO
824     orig_stdout = getattr(sys, stream_name)
825     setattr(sys, stream_name, StringIO.StringIO())
826     try:
827         yield getattr(sys, stream_name)
828     finally:
829         setattr(sys, stream_name, orig_stdout)
830
831 def captured_stdout():
832     return captured_output("stdout")
833
834 def captured_stdin():
835     return captured_output("stdin")
836
837 def gc_collect():
838     """Force as many objects as possible to be collected.
839
840     In non-CPython implementations of Python, this is needed because timely
841     deallocation is not guaranteed by the garbage collector.  (Even in CPython
842     this can be the case in case of reference cycles.)  This means that __del__
843     methods may be called later than expected and weakrefs may remain alive for
844     longer than expected.  This function tries its best to force all garbage
845     objects to disappear.
846     """
847     gc.collect()
848     if is_jython:
849         time.sleep(0.1)
850     gc.collect()
851     gc.collect()
852
853
854 #=======================================================================
855 # Decorator for running a function in a different locale, correctly resetting
856 # it afterwards.
857
858 def run_with_locale(catstr, *locales):
859     def decorator(func):
860         def inner(*args, **kwds):
861             try:
862                 import locale
863                 category = getattr(locale, catstr)
864                 orig_locale = locale.setlocale(category)
865             except AttributeError:
866                 # if the test author gives us an invalid category string
867                 raise
868             except:
869                 # cannot retrieve original locale, so do nothing
870                 locale = orig_locale = None
871             else:
872                 for loc in locales:
873                     try:
874                         locale.setlocale(category, loc)
875                         break
876                     except:
877                         pass
878
879             # now run the function, resetting the locale on exceptions
880             try:
881                 return func(*args, **kwds)
882             finally:
883                 if locale and orig_locale:
884                     locale.setlocale(category, orig_locale)
885         inner.func_name = func.func_name
886         inner.__doc__ = func.__doc__
887         return inner
888     return decorator
889
890 #=======================================================================
891 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
892
893 # Some handy shorthands. Note that these are used for byte-limits as well
894 # as size-limits, in the various bigmem tests
895 _1M = 1024*1024
896 _1G = 1024 * _1M
897 _2G = 2 * _1G
898 _4G = 4 * _1G
899
900 MAX_Py_ssize_t = sys.maxsize
901
902 def set_memlimit(limit):
903     global max_memuse
904     global real_max_memuse
905     sizes = {
906         'k': 1024,
907         'm': _1M,
908         'g': _1G,
909         't': 1024*_1G,
910     }
911     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
912                  re.IGNORECASE | re.VERBOSE)
913     if m is None:
914         raise ValueError('Invalid memory limit %r' % (limit,))
915     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
916     real_max_memuse = memlimit
917     if memlimit > MAX_Py_ssize_t:
918         memlimit = MAX_Py_ssize_t
919     if memlimit < _2G - 1:
920         raise ValueError('Memory limit %r too low to be useful' % (limit,))
921     max_memuse = memlimit
922
923 def bigmemtest(minsize, memuse, overhead=5*_1M):
924     """Decorator for bigmem tests.
925
926     'minsize' is the minimum useful size for the test (in arbitrary,
927     test-interpreted units.) 'memuse' is the number of 'bytes per size' for
928     the test, or a good estimate of it. 'overhead' specifies fixed overhead,
929     independent of the testsize, and defaults to 5Mb.
930
931     The decorator tries to guess a good value for 'size' and passes it to
932     the decorated test function. If minsize * memuse is more than the
933     allowed memory use (as defined by max_memuse), the test is skipped.
934     Otherwise, minsize is adjusted upward to use up to max_memuse.
935     """
936     def decorator(f):
937         def wrapper(self):
938             if not max_memuse:
939                 # If max_memuse is 0 (the default),
940                 # we still want to run the tests with size set to a few kb,
941                 # to make sure they work. We still want to avoid using
942                 # too much memory, though, but we do that noisily.
943                 maxsize = 5147
944                 self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
945             else:
946                 maxsize = int((max_memuse - overhead) / memuse)
947                 if maxsize < minsize:
948                     # Really ought to print 'test skipped' or something
949                     if verbose:
950                         sys.stderr.write("Skipping %s because of memory "
951                                          "constraint\n" % (f.__name__,))
952                     return
953                 # Try to keep some breathing room in memory use
954                 maxsize = max(maxsize - 50 * _1M, minsize)
955             return f(self, maxsize)
956         wrapper.minsize = minsize
957         wrapper.memuse = memuse
958         wrapper.overhead = overhead
959         return wrapper
960     return decorator
961
962 def precisionbigmemtest(size, memuse, overhead=5*_1M):
963     def decorator(f):
964         def wrapper(self):
965             if not real_max_memuse:
966                 maxsize = 5147
967             else:
968                 maxsize = size
969
970                 if real_max_memuse and real_max_memuse < maxsize * memuse:
971                     if verbose:
972                         sys.stderr.write("Skipping %s because of memory "
973                                          "constraint\n" % (f.__name__,))
974                     return
975
976             return f(self, maxsize)
977         wrapper.size = size
978         wrapper.memuse = memuse
979         wrapper.overhead = overhead
980         return wrapper
981     return decorator
982
983 def bigaddrspacetest(f):
984     """Decorator for tests that fill the address space."""
985     def wrapper(self):
986         if max_memuse < MAX_Py_ssize_t:
987             if verbose:
988                 sys.stderr.write("Skipping %s because of memory "
989                                  "constraint\n" % (f.__name__,))
990         else:
991             return f(self)
992     return wrapper
993
994 #=======================================================================
995 # unittest integration.
996
997 class BasicTestRunner:
998     def run(self, test):
999         result = unittest.TestResult()
1000         test(result)
1001         return result
1002
1003 def _id(obj):
1004     return obj
1005
1006 def requires_resource(resource):
1007     if is_resource_enabled(resource):
1008         return _id
1009     else:
1010         return unittest.skip("resource {0!r} is not enabled".format(resource))
1011
1012 def cpython_only(test):
1013     """
1014     Decorator for tests only applicable on CPython.
1015     """
1016     return impl_detail(cpython=True)(test)
1017
1018 def impl_detail(msg=None, **guards):
1019     if check_impl_detail(**guards):
1020         return _id
1021     if msg is None:
1022         guardnames, default = _parse_guards(guards)
1023         if default:
1024             msg = "implementation detail not available on {0}"
1025         else:
1026             msg = "implementation detail specific to {0}"
1027         guardnames = sorted(guardnames.keys())
1028         msg = msg.format(' or '.join(guardnames))
1029     return unittest.skip(msg)
1030
1031 def _parse_guards(guards):
1032     # Returns a tuple ({platform_name: run_me}, default_value)
1033     if not guards:
1034         return ({'cpython': True}, False)
1035     is_true = guards.values()[0]
1036     assert guards.values() == [is_true] * len(guards)   # all True or all False
1037     return (guards, not is_true)
1038
1039 # Use the following check to guard CPython's implementation-specific tests --
1040 # or to run them only on the implementation(s) guarded by the arguments.
1041 def check_impl_detail(**guards):
1042     """This function returns True or False depending on the host platform.
1043        Examples:
1044           if check_impl_detail():               # only on CPython (default)
1045           if check_impl_detail(jython=True):    # only on Jython
1046           if check_impl_detail(cpython=False):  # everywhere except on CPython
1047     """
1048     guards, default = _parse_guards(guards)
1049     return guards.get(platform.python_implementation().lower(), default)
1050
1051
1052
1053 def _run_suite(suite):
1054     """Run tests from a unittest.TestSuite-derived class."""
1055     if verbose:
1056         runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1057     else:
1058         runner = BasicTestRunner()
1059
1060     result = runner.run(suite)
1061     if not result.wasSuccessful():
1062         if len(result.errors) == 1 and not result.failures:
1063             err = result.errors[0][1]
1064         elif len(result.failures) == 1 and not result.errors:
1065             err = result.failures[0][1]
1066         else:
1067             err = "multiple errors occurred"
1068             if not verbose:
1069                 err += "; run in verbose mode for details"
1070         raise TestFailed(err)
1071
1072
1073 def run_unittest(*classes):
1074     """Run tests from unittest.TestCase-derived classes."""
1075     valid_types = (unittest.TestSuite, unittest.TestCase)
1076     suite = unittest.TestSuite()
1077     for cls in classes:
1078         if isinstance(cls, str):
1079             if cls in sys.modules:
1080                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
1081             else:
1082                 raise ValueError("str arguments must be keys in sys.modules")
1083         elif isinstance(cls, valid_types):
1084             suite.addTest(cls)
1085         else:
1086             suite.addTest(unittest.makeSuite(cls))
1087     _run_suite(suite)
1088
1089
1090 #=======================================================================
1091 # doctest driver.
1092
1093 def run_doctest(module, verbosity=None):
1094     """Run doctest on the given module.  Return (#failures, #tests).
1095
1096     If optional argument verbosity is not specified (or is None), pass
1097     test_support's belief about verbosity on to doctest.  Else doctest's
1098     usual behavior is used (it searches sys.argv for -v).
1099     """
1100
1101     import doctest
1102
1103     if verbosity is None:
1104         verbosity = verbose
1105     else:
1106         verbosity = None
1107
1108     # Direct doctest output (normally just errors) to real stdout; doctest
1109     # output shouldn't be compared by regrtest.
1110     save_stdout = sys.stdout
1111     sys.stdout = get_original_stdout()
1112     try:
1113         f, t = doctest.testmod(module, verbose=verbosity)
1114         if f:
1115             raise TestFailed("%d of %d doctests failed" % (f, t))
1116     finally:
1117         sys.stdout = save_stdout
1118     if verbose:
1119         print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1120     return f, t
1121
1122 #=======================================================================
1123 # Threading support to prevent reporting refleaks when running regrtest.py -R
1124
1125 # NOTE: we use thread._count() rather than threading.enumerate() (or the
1126 # moral equivalent thereof) because a threading.Thread object is still alive
1127 # until its __bootstrap() method has returned, even after it has been
1128 # unregistered from the threading module.
1129 # thread._count(), on the other hand, only gets decremented *after* the
1130 # __bootstrap() method has returned, which gives us reliable reference counts
1131 # at the end of a test run.
1132
1133 def threading_setup():
1134     if thread:
1135         return thread._count(),
1136     else:
1137         return 1,
1138
1139 def threading_cleanup(nb_threads):
1140     if not thread:
1141         return
1142
1143     _MAX_COUNT = 10
1144     for count in range(_MAX_COUNT):
1145         n = thread._count()
1146         if n == nb_threads:
1147             break
1148         time.sleep(0.1)
1149     # XXX print a warning in case of failure?
1150
1151 def reap_threads(func):
1152     """Use this function when threads are being used.  This will
1153     ensure that the threads are cleaned up even when the test fails.
1154     If threading is unavailable this function does nothing.
1155     """
1156     if not thread:
1157         return func
1158
1159     @functools.wraps(func)
1160     def decorator(*args):
1161         key = threading_setup()
1162         try:
1163             return func(*args)
1164         finally:
1165             threading_cleanup(*key)
1166     return decorator
1167
1168 def reap_children():
1169     """Use this function at the end of test_main() whenever sub-processes
1170     are started.  This will help ensure that no extra children (zombies)
1171     stick around to hog resources and create problems when looking
1172     for refleaks.
1173     """
1174
1175     # Reap all our dead child processes so we don't leave zombies around.
1176     # These hog resources and might be causing some of the buildbots to die.
1177     if hasattr(os, 'waitpid'):
1178         any_process = -1
1179         while True:
1180             try:
1181                 # This will raise an exception on Windows.  That's ok.
1182                 pid, status = os.waitpid(any_process, os.WNOHANG)
1183                 if pid == 0:
1184                     break
1185             except:
1186                 break
1187
1188 def py3k_bytes(b):
1189     """Emulate the py3k bytes() constructor.
1190
1191     NOTE: This is only a best effort function.
1192     """
1193     try:
1194         # memoryview?
1195         return b.tobytes()
1196     except AttributeError:
1197         try:
1198             # iterable of ints?
1199             return b"".join(chr(x) for x in b)
1200         except TypeError:
1201             return bytes(b)
1202
1203 def args_from_interpreter_flags():
1204     """Return a list of command-line arguments reproducing the current
1205     settings in sys.flags."""
1206     flag_opt_map = {
1207         'bytes_warning': 'b',
1208         'dont_write_bytecode': 'B',
1209         'ignore_environment': 'E',
1210         'no_user_site': 's',
1211         'no_site': 'S',
1212         'optimize': 'O',
1213         'py3k_warning': '3',
1214         'verbose': 'v',
1215     }
1216     args = []
1217     for flag, opt in flag_opt_map.items():
1218         v = getattr(sys.flags, flag)
1219         if v > 0:
1220             args.append('-' + opt * v)
1221     return args