--- /dev/null
+"""Cache lines from files.
+
+This is intended to read lines from modules imported -- hence if a filename
+is not found, it will look down the module search path for a file by
+that name.
+"""
+
+import functools
+import io
+import sys
+import os
+import tokenize
+
+__all__ = ["getline", "clearcache", "checkcache"]
+
+def getline(filename, lineno, module_globals=None):
+ lines = getlines(filename, module_globals)
+ if 1 <= lineno <= len(lines):
+ return lines[lineno-1]
+ else:
+ return ''
+
+
+# The cache
+
+# The cache. Maps filenames to either a thunk which will provide source code,
+# or a tuple (size, mtime, lines, fullname) once loaded.
+cache = {}
+
+
+def clearcache():
+ """Clear the cache entirely."""
+
+ global cache
+ cache = {}
+
+
+def getlines(filename, module_globals=None):
+ """Get the lines for a file from the cache.
+ Update the cache if it doesn't contain an entry for this file already."""
+
+ if filename in cache:
+ entry = cache[filename]
+ if len(entry) == 1:
+ return updatecache(filename, module_globals)
+ return cache[filename][2]
+ else:
+ return updatecache(filename, module_globals)
+
+
+def checkcache(filename=None):
+ """Discard cache entries that are out of date.
+ (This is not checked upon each call!)"""
+
+ if filename is None:
+ filenames = list(cache.keys())
+ else:
+ if filename in cache:
+ filenames = [filename]
+ else:
+ return
+
+ for filename in filenames:
+ entry = cache[filename]
+ if len(entry) == 1:
+ # lazy cache entry, leave it lazy.
+ continue
+ size, mtime, lines, fullname = entry
+ if mtime is None:
+ continue # no-op for files loaded via a __loader__
+ try:
+ stat = os.stat(fullname)
+ except OSError:
+ del cache[filename]
+ continue
+ if size != stat.st_size or mtime != stat.st_mtime:
+ del cache[filename]
+
+
+def updatecache(filename, module_globals=None):
+ """Update a cache entry and return its list of lines.
+ If something's wrong, print a message, discard the cache entry,
+ and return an empty list."""
+
+ if filename in cache:
+ if len(cache[filename]) != 1:
+ del cache[filename]
+ if not filename or (filename.startswith('<') and filename.endswith('>')):
+ return []
+
+ fullname = filename
+ try:
+ stat = os.stat(fullname)
+ except OSError:
+ basename = filename
+
+ # Realise a lazy loader based lookup if there is one
+ # otherwise try to lookup right now.
+ if lazycache(filename, module_globals):
+ try:
+ data = cache[filename][0]()
+ except (ImportError, OSError):
+ pass
+ else:
+ if data is None:
+ # No luck, the PEP302 loader cannot find the source
+ # for this module.
+ return []
+ cache[filename] = (
+ len(data), None,
+ [line+'\n' for line in data.splitlines()], fullname
+ )
+ return cache[filename][2]
+
+ # Try looking through the module search path, which is only useful
+ # when handling a relative filename.
+ if os.path.isabs(filename):
+ return []
+
+ for dirname in sys.path:
+ try:
+ fullname = os.path.join(dirname, basename)
+ except (TypeError, AttributeError):
+ # Not sufficiently string-like to do anything useful with.
+ continue
+ try:
+ stat = os.stat(fullname)
+ break
+ except OSError:
+ pass
+ else:
+ return []
+ try:
+ with _tokenize_open(fullname) as fp:
+ lines = fp.readlines()
+ except OSError:
+ return []
+ if lines and not lines[-1].endswith('\n'):
+ lines[-1] += '\n'
+ size, mtime = stat.st_size, stat.st_mtime
+ cache[filename] = size, mtime, lines, fullname
+ return lines
+
+
+def lazycache(filename, module_globals):
+ """Seed the cache for filename with module_globals.
+
+ The module loader will be asked for the source only when getlines is
+ called, not immediately.
+
+ If there is an entry in the cache already, it is not altered.
+
+ :return: True if a lazy load is registered in the cache,
+ otherwise False. To register such a load a module loader with a
+ get_source method must be found, the filename must be a cachable
+ filename, and the filename must not be already cached.
+ """
+ if filename in cache:
+ if len(cache[filename]) == 1:
+ return True
+ else:
+ return False
+ if not filename or (filename.startswith('<') and filename.endswith('>')):
+ return False
+ # Try for a __loader__, if available
+ if module_globals and '__loader__' in module_globals:
+ name = module_globals.get('__name__')
+ loader = module_globals['__loader__']
+ get_source = getattr(loader, 'get_source', None)
+
+ if name and get_source:
+ get_lines = functools.partial(get_source, name)
+ cache[filename] = (get_lines,)
+ return True
+ return False
+
+
+#### ---- avoiding having a tokenize2 backport for now ----
+from codecs import lookup, BOM_UTF8
+import re
+cookie_re = re.compile(r'^[ \t\f]*#.*coding[:=][ \t]*([-\w.]+)'.encode('utf8'))
+blank_re = re.compile(r'^[ \t\f]*(?:[#\r\n]|$)'.encode('utf8'))
+
+
+def _tokenize_open(filename):
+ """Open a file in read only mode using the encoding detected by
+ detect_encoding().
+ """
+ buffer = io.open(filename, 'rb')
+ encoding, lines = _detect_encoding(buffer.readline)
+ buffer.seek(0)
+ text = io.TextIOWrapper(buffer, encoding, line_buffering=True)
+ text.mode = 'r'
+ return text
+
+
+def _get_normal_name(orig_enc):
+ """Imitates get_normal_name in tokenizer.c."""
+ # Only care about the first 12 characters.
+ enc = orig_enc[:12].lower().replace("_", "-")
+ if enc == "utf-8" or enc.startswith("utf-8-"):
+ return "utf-8"
+ if enc in ("latin-1", "iso-8859-1", "iso-latin-1") or \
+ enc.startswith(("latin-1-", "iso-8859-1-", "iso-latin-1-")):
+ return "iso-8859-1"
+ return orig_enc
+
+
+def _detect_encoding(readline):
+ """
+ The detect_encoding() function is used to detect the encoding that should
+ be used to decode a Python source file. It requires one argument, readline,
+ in the same way as the tokenize() generator.
+
+ It will call readline a maximum of twice, and return the encoding used
+ (as a string) and a list of any lines (left as bytes) it has read in.
+
+ It detects the encoding from the presence of a utf-8 bom or an encoding
+ cookie as specified in pep-0263. If both a bom and a cookie are present,
+ but disagree, a SyntaxError will be raised. If the encoding cookie is an
+ invalid charset, raise a SyntaxError. Note that if a utf-8 bom is found,
+ 'utf-8-sig' is returned.
+
+ If no encoding is specified, then the default of 'utf-8' will be returned.
+ """
+ try:
+ filename = readline.__self__.name
+ except AttributeError:
+ filename = None
+ bom_found = False
+ encoding = None
+ default = 'utf-8'
+ def read_or_stop():
+ try:
+ return readline()
+ except StopIteration:
+ return b''
+
+ def find_cookie(line):
+ try:
+ # Decode as UTF-8. Either the line is an encoding declaration,
+ # in which case it should be pure ASCII, or it must be UTF-8
+ # per default encoding.
+ line_string = line.decode('utf-8')
+ except UnicodeDecodeError:
+ msg = "invalid or missing encoding declaration"
+ if filename is not None:
+ msg = '{0} for {1!r}'.format(msg, filename)
+ raise SyntaxError(msg)
+
+ match = cookie_re.match(line)
+ if not match:
+ return None
+ encoding = _get_normal_name(match.group(1).decode('utf-8'))
+ try:
+ codec = lookup(encoding)
+ except LookupError:
+ # This behaviour mimics the Python interpreter
+ if filename is None:
+ msg = "unknown encoding: " + encoding
+ else:
+ msg = "unknown encoding for {!r}: {}".format(filename,
+ encoding)
+ raise SyntaxError(msg)
+
+ if bom_found:
+ if encoding != 'utf-8':
+ # This behaviour mimics the Python interpreter
+ if filename is None:
+ msg = 'encoding problem: utf-8'
+ else:
+ msg = 'encoding problem for {!r}: utf-8'.format(filename)
+ raise SyntaxError(msg)
+ encoding += '-sig'
+ return encoding
+
+ first = read_or_stop()
+ if first.startswith(BOM_UTF8):
+ bom_found = True
+ first = first[3:]
+ default = 'utf-8-sig'
+ if not first:
+ return default, []
+
+ encoding = find_cookie(first)
+ if encoding:
+ return encoding, [first]
+ if not blank_re.match(first):
+ return default, [first]
+
+ second = read_or_stop()
+ if not second:
+ return default, [first]
+
+ encoding = find_cookie(second)
+ if encoding:
+ return encoding, [first, second]
+
+ return default, [first, second]
+
+
--- /dev/null
+# tests common to dict and UserDict
+import unittest
+import collections
+
+
+class BasicTestMappingProtocol(unittest.TestCase):
+ # This base class can be used to check that an object conforms to the
+ # mapping protocol
+
+ # Functions that can be useful to override to adapt to dictionary
+ # semantics
+ type2test = None # which class is being tested (overwrite in subclasses)
+
+ def _reference(self):
+ """Return a dictionary of values which are invariant by storage
+ in the object under test."""
+ return {"1": "2", "key1":"value1", "key2":(1,2,3)}
+ def _empty_mapping(self):
+ """Return an empty mapping object"""
+ return self.type2test()
+ def _full_mapping(self, data):
+ """Return a mapping object with the value contained in data
+ dictionary"""
+ x = self._empty_mapping()
+ for key, value in data.items():
+ x[key] = value
+ return x
+
+ def __init__(self, *args, **kw):
+ unittest.TestCase.__init__(self, *args, **kw)
+ self.reference = self._reference().copy()
+
+ # A (key, value) pair not in the mapping
+ key, value = self.reference.popitem()
+ self.other = {key:value}
+
+ # A (key, value) pair in the mapping
+ key, value = self.reference.popitem()
+ self.inmapping = {key:value}
+ self.reference[key] = value
+
+ def test_read(self):
+ # Test for read only operations on mapping
+ p = self._empty_mapping()
+ p1 = dict(p) #workaround for singleton objects
+ d = self._full_mapping(self.reference)
+ if d is p:
+ p = p1
+ #Indexing
+ for key, value in self.reference.items():
+ self.assertEqual(d[key], value)
+ knownkey = list(self.other.keys())[0]
+ self.assertRaises(KeyError, lambda:d[knownkey])
+ #len
+ self.assertEqual(len(p), 0)
+ self.assertEqual(len(d), len(self.reference))
+ #__contains__
+ for k in self.reference:
+ self.assertIn(k, d)
+ for k in self.other:
+ self.assertNotIn(k, d)
+ #cmp
+ self.assertEqual(p, p)
+ self.assertEqual(d, d)
+ self.assertNotEqual(p, d)
+ self.assertNotEqual(d, p)
+ #__non__zero__
+ if p: self.fail("Empty mapping must compare to False")
+ if not d: self.fail("Full mapping must compare to True")
+ # keys(), items(), iterkeys() ...
+ def check_iterandlist(iter, lst, ref):
+ self.assertTrue(hasattr(iter, '__next__'))
+ self.assertTrue(hasattr(iter, '__iter__'))
+ x = list(iter)
+ self.assertTrue(set(x)==set(lst)==set(ref))
+ check_iterandlist(iter(d.keys()), list(d.keys()),
+ self.reference.keys())
+ check_iterandlist(iter(d), list(d.keys()), self.reference.keys())
+ check_iterandlist(iter(d.values()), list(d.values()),
+ self.reference.values())
+ check_iterandlist(iter(d.items()), list(d.items()),
+ self.reference.items())
+ #get
+ key, value = next(iter(d.items()))
+ knownkey, knownvalue = next(iter(self.other.items()))
+ self.assertEqual(d.get(key, knownvalue), value)
+ self.assertEqual(d.get(knownkey, knownvalue), knownvalue)
+ self.assertNotIn(knownkey, d)
+
+ def test_write(self):
+ # Test for write operations on mapping
+ p = self._empty_mapping()
+ #Indexing
+ for key, value in self.reference.items():
+ p[key] = value
+ self.assertEqual(p[key], value)
+ for key in self.reference.keys():
+ del p[key]
+ self.assertRaises(KeyError, lambda:p[key])
+ p = self._empty_mapping()
+ #update
+ p.update(self.reference)
+ self.assertEqual(dict(p), self.reference)
+ items = list(p.items())
+ p = self._empty_mapping()
+ p.update(items)
+ self.assertEqual(dict(p), self.reference)
+ d = self._full_mapping(self.reference)
+ #setdefault
+ key, value = next(iter(d.items()))
+ knownkey, knownvalue = next(iter(self.other.items()))
+ self.assertEqual(d.setdefault(key, knownvalue), value)
+ self.assertEqual(d[key], value)
+ self.assertEqual(d.setdefault(knownkey, knownvalue), knownvalue)
+ self.assertEqual(d[knownkey], knownvalue)
+ #pop
+ self.assertEqual(d.pop(knownkey), knownvalue)
+ self.assertNotIn(knownkey, d)
+ self.assertRaises(KeyError, d.pop, knownkey)
+ default = 909
+ d[knownkey] = knownvalue
+ self.assertEqual(d.pop(knownkey, default), knownvalue)
+ self.assertNotIn(knownkey, d)
+ self.assertEqual(d.pop(knownkey, default), default)
+ #popitem
+ key, value = d.popitem()
+ self.assertNotIn(key, d)
+ self.assertEqual(value, self.reference[key])
+ p=self._empty_mapping()
+ self.assertRaises(KeyError, p.popitem)
+
+ def test_constructor(self):
+ self.assertEqual(self._empty_mapping(), self._empty_mapping())
+
+ def test_bool(self):
+ self.assertTrue(not self._empty_mapping())
+ self.assertTrue(self.reference)
+ self.assertTrue(bool(self._empty_mapping()) is False)
+ self.assertTrue(bool(self.reference) is True)
+
+ def test_keys(self):
+ d = self._empty_mapping()
+ self.assertEqual(list(d.keys()), [])
+ d = self.reference
+ self.assertIn(list(self.inmapping.keys())[0], d.keys())
+ self.assertNotIn(list(self.other.keys())[0], d.keys())
+ self.assertRaises(TypeError, d.keys, None)
+
+ def test_values(self):
+ d = self._empty_mapping()
+ self.assertEqual(list(d.values()), [])
+
+ self.assertRaises(TypeError, d.values, None)
+
+ def test_items(self):
+ d = self._empty_mapping()
+ self.assertEqual(list(d.items()), [])
+
+ self.assertRaises(TypeError, d.items, None)
+
+ def test_len(self):
+ d = self._empty_mapping()
+ self.assertEqual(len(d), 0)
+
+ def test_getitem(self):
+ d = self.reference
+ self.assertEqual(d[list(self.inmapping.keys())[0]],
+ list(self.inmapping.values())[0])
+
+ self.assertRaises(TypeError, d.__getitem__)
+
+ def test_update(self):
+ # mapping argument
+ d = self._empty_mapping()
+ d.update(self.other)
+ self.assertEqual(list(d.items()), list(self.other.items()))
+
+ # No argument
+ d = self._empty_mapping()
+ d.update()
+ self.assertEqual(d, self._empty_mapping())
+
+ # item sequence
+ d = self._empty_mapping()
+ d.update(self.other.items())
+ self.assertEqual(list(d.items()), list(self.other.items()))
+
+ # Iterator
+ d = self._empty_mapping()
+ d.update(self.other.items())
+ self.assertEqual(list(d.items()), list(self.other.items()))
+
+ # FIXME: Doesn't work with UserDict
+ # self.assertRaises((TypeError, AttributeError), d.update, None)
+ self.assertRaises((TypeError, AttributeError), d.update, 42)
+
+ outerself = self
+ class SimpleUserDict:
+ def __init__(self):
+ self.d = outerself.reference
+ def keys(self):
+ return self.d.keys()
+ def __getitem__(self, i):
+ return self.d[i]
+ d.clear()
+ d.update(SimpleUserDict())
+ i1 = sorted(d.items())
+ i2 = sorted(self.reference.items())
+ self.assertEqual(i1, i2)
+
+ class Exc(Exception): pass
+
+ d = self._empty_mapping()
+ class FailingUserDict:
+ def keys(self):
+ raise Exc
+ self.assertRaises(Exc, d.update, FailingUserDict())
+
+ d.clear()
+
+ class FailingUserDict:
+ def keys(self):
+ class BogonIter:
+ def __init__(self):
+ self.i = 1
+ def __iter__(self):
+ return self
+ def __next__(self):
+ if self.i:
+ self.i = 0
+ return 'a'
+ raise Exc
+ return BogonIter()
+ def __getitem__(self, key):
+ return key
+ self.assertRaises(Exc, d.update, FailingUserDict())
+
+ class FailingUserDict:
+ def keys(self):
+ class BogonIter:
+ def __init__(self):
+ self.i = ord('a')
+ def __iter__(self):
+ return self
+ def __next__(self):
+ if self.i <= ord('z'):
+ rtn = chr(self.i)
+ self.i += 1
+ return rtn
+ raise StopIteration
+ return BogonIter()
+ def __getitem__(self, key):
+ raise Exc
+ self.assertRaises(Exc, d.update, FailingUserDict())
+
+ d = self._empty_mapping()
+ class badseq(object):
+ def __iter__(self):
+ return self
+ def __next__(self):
+ raise Exc()
+
+ self.assertRaises(Exc, d.update, badseq())
+
+ self.assertRaises(ValueError, d.update, [(1, 2, 3)])
+
+ # no test_fromkeys or test_copy as both os.environ and selves don't support it
+
+ def test_get(self):
+ d = self._empty_mapping()
+ self.assertTrue(d.get(list(self.other.keys())[0]) is None)
+ self.assertEqual(d.get(list(self.other.keys())[0], 3), 3)
+ d = self.reference
+ self.assertTrue(d.get(list(self.other.keys())[0]) is None)
+ self.assertEqual(d.get(list(self.other.keys())[0], 3), 3)
+ self.assertEqual(d.get(list(self.inmapping.keys())[0]),
+ list(self.inmapping.values())[0])
+ self.assertEqual(d.get(list(self.inmapping.keys())[0], 3),
+ list(self.inmapping.values())[0])
+ self.assertRaises(TypeError, d.get)
+ self.assertRaises(TypeError, d.get, None, None, None)
+
+ def test_setdefault(self):
+ d = self._empty_mapping()
+ self.assertRaises(TypeError, d.setdefault)
+
+ def test_popitem(self):
+ d = self._empty_mapping()
+ self.assertRaises(KeyError, d.popitem)
+ self.assertRaises(TypeError, d.popitem, 42)
+
+ def test_pop(self):
+ d = self._empty_mapping()
+ k, v = list(self.inmapping.items())[0]
+ d[k] = v
+ self.assertRaises(KeyError, d.pop, list(self.other.keys())[0])
+
+ self.assertEqual(d.pop(k), v)
+ self.assertEqual(len(d), 0)
+
+ self.assertRaises(KeyError, d.pop, k)
+
+
+class TestMappingProtocol(BasicTestMappingProtocol):
+ def test_constructor(self):
+ BasicTestMappingProtocol.test_constructor(self)
+ self.assertTrue(self._empty_mapping() is not self._empty_mapping())
+ self.assertEqual(self.type2test(x=1, y=2), {"x": 1, "y": 2})
+
+ def test_bool(self):
+ BasicTestMappingProtocol.test_bool(self)
+ self.assertTrue(not self._empty_mapping())
+ self.assertTrue(self._full_mapping({"x": "y"}))
+ self.assertTrue(bool(self._empty_mapping()) is False)
+ self.assertTrue(bool(self._full_mapping({"x": "y"})) is True)
+
+ def test_keys(self):
+ BasicTestMappingProtocol.test_keys(self)
+ d = self._empty_mapping()
+ self.assertEqual(list(d.keys()), [])
+ d = self._full_mapping({'a': 1, 'b': 2})
+ k = d.keys()
+ self.assertIn('a', k)
+ self.assertIn('b', k)
+ self.assertNotIn('c', k)
+
+ def test_values(self):
+ BasicTestMappingProtocol.test_values(self)
+ d = self._full_mapping({1:2})
+ self.assertEqual(list(d.values()), [2])
+
+ def test_items(self):
+ BasicTestMappingProtocol.test_items(self)
+
+ d = self._full_mapping({1:2})
+ self.assertEqual(list(d.items()), [(1, 2)])
+
+ def test_contains(self):
+ d = self._empty_mapping()
+ self.assertNotIn('a', d)
+ self.assertTrue(not ('a' in d))
+ self.assertTrue('a' not in d)
+ d = self._full_mapping({'a': 1, 'b': 2})
+ self.assertIn('a', d)
+ self.assertIn('b', d)
+ self.assertNotIn('c', d)
+
+ self.assertRaises(TypeError, d.__contains__)
+
+ def test_len(self):
+ BasicTestMappingProtocol.test_len(self)
+ d = self._full_mapping({'a': 1, 'b': 2})
+ self.assertEqual(len(d), 2)
+
+ def test_getitem(self):
+ BasicTestMappingProtocol.test_getitem(self)
+ d = self._full_mapping({'a': 1, 'b': 2})
+ self.assertEqual(d['a'], 1)
+ self.assertEqual(d['b'], 2)
+ d['c'] = 3
+ d['a'] = 4
+ self.assertEqual(d['c'], 3)
+ self.assertEqual(d['a'], 4)
+ del d['b']
+ self.assertEqual(d, {'a': 4, 'c': 3})
+
+ self.assertRaises(TypeError, d.__getitem__)
+
+ def test_clear(self):
+ d = self._full_mapping({1:1, 2:2, 3:3})
+ d.clear()
+ self.assertEqual(d, {})
+
+ self.assertRaises(TypeError, d.clear, None)
+
+ def test_update(self):
+ BasicTestMappingProtocol.test_update(self)
+ # mapping argument
+ d = self._empty_mapping()
+ d.update({1:100})
+ d.update({2:20})
+ d.update({1:1, 2:2, 3:3})
+ self.assertEqual(d, {1:1, 2:2, 3:3})
+
+ # no argument
+ d.update()
+ self.assertEqual(d, {1:1, 2:2, 3:3})
+
+ # keyword arguments
+ d = self._empty_mapping()
+ d.update(x=100)
+ d.update(y=20)
+ d.update(x=1, y=2, z=3)
+ self.assertEqual(d, {"x":1, "y":2, "z":3})
+
+ # item sequence
+ d = self._empty_mapping()
+ d.update([("x", 100), ("y", 20)])
+ self.assertEqual(d, {"x":100, "y":20})
+
+ # Both item sequence and keyword arguments
+ d = self._empty_mapping()
+ d.update([("x", 100), ("y", 20)], x=1, y=2)
+ self.assertEqual(d, {"x":1, "y":2})
+
+ # iterator
+ d = self._full_mapping({1:3, 2:4})
+ d.update(self._full_mapping({1:2, 3:4, 5:6}).items())
+ self.assertEqual(d, {1:2, 2:4, 3:4, 5:6})
+
+ class SimpleUserDict:
+ def __init__(self):
+ self.d = {1:1, 2:2, 3:3}
+ def keys(self):
+ return self.d.keys()
+ def __getitem__(self, i):
+ return self.d[i]
+ d.clear()
+ d.update(SimpleUserDict())
+ self.assertEqual(d, {1:1, 2:2, 3:3})
+
+ def test_fromkeys(self):
+ self.assertEqual(self.type2test.fromkeys('abc'), {'a':None, 'b':None, 'c':None})
+ d = self._empty_mapping()
+ self.assertTrue(not(d.fromkeys('abc') is d))
+ self.assertEqual(d.fromkeys('abc'), {'a':None, 'b':None, 'c':None})
+ self.assertEqual(d.fromkeys((4,5),0), {4:0, 5:0})
+ self.assertEqual(d.fromkeys([]), {})
+ def g():
+ yield 1
+ self.assertEqual(d.fromkeys(g()), {1:None})
+ self.assertRaises(TypeError, {}.fromkeys, 3)
+ class dictlike(self.type2test): pass
+ self.assertEqual(dictlike.fromkeys('a'), {'a':None})
+ self.assertEqual(dictlike().fromkeys('a'), {'a':None})
+ self.assertTrue(dictlike.fromkeys('a').__class__ is dictlike)
+ self.assertTrue(dictlike().fromkeys('a').__class__ is dictlike)
+ self.assertTrue(type(dictlike.fromkeys('a')) is dictlike)
+ class mydict(self.type2test):
+ def __new__(cls):
+ return collections.UserDict()
+ ud = mydict.fromkeys('ab')
+ self.assertEqual(ud, {'a':None, 'b':None})
+ self.assertIsInstance(ud, collections.UserDict)
+ self.assertRaises(TypeError, dict.fromkeys)
+
+ class Exc(Exception): pass
+
+ class baddict1(self.type2test):
+ def __init__(self):
+ raise Exc()
+
+ self.assertRaises(Exc, baddict1.fromkeys, [1])
+
+ class BadSeq(object):
+ def __iter__(self):
+ return self
+ def __next__(self):
+ raise Exc()
+
+ self.assertRaises(Exc, self.type2test.fromkeys, BadSeq())
+
+ class baddict2(self.type2test):
+ def __setitem__(self, key, value):
+ raise Exc()
+
+ self.assertRaises(Exc, baddict2.fromkeys, [1])
+
+ def test_copy(self):
+ d = self._full_mapping({1:1, 2:2, 3:3})
+ self.assertEqual(d.copy(), {1:1, 2:2, 3:3})
+ d = self._empty_mapping()
+ self.assertEqual(d.copy(), d)
+ self.assertIsInstance(d.copy(), d.__class__)
+ self.assertRaises(TypeError, d.copy, None)
+
+ def test_get(self):
+ BasicTestMappingProtocol.test_get(self)
+ d = self._empty_mapping()
+ self.assertTrue(d.get('c') is None)
+ self.assertEqual(d.get('c', 3), 3)
+ d = self._full_mapping({'a' : 1, 'b' : 2})
+ self.assertTrue(d.get('c') is None)
+ self.assertEqual(d.get('c', 3), 3)
+ self.assertEqual(d.get('a'), 1)
+ self.assertEqual(d.get('a', 3), 1)
+
+ def test_setdefault(self):
+ BasicTestMappingProtocol.test_setdefault(self)
+ d = self._empty_mapping()
+ self.assertTrue(d.setdefault('key0') is None)
+ d.setdefault('key0', [])
+ self.assertTrue(d.setdefault('key0') is None)
+ d.setdefault('key', []).append(3)
+ self.assertEqual(d['key'][0], 3)
+ d.setdefault('key', []).append(4)
+ self.assertEqual(len(d['key']), 2)
+
+ def test_popitem(self):
+ BasicTestMappingProtocol.test_popitem(self)
+ for copymode in -1, +1:
+ # -1: b has same structure as a
+ # +1: b is a.copy()
+ for log2size in range(12):
+ size = 2**log2size
+ a = self._empty_mapping()
+ b = self._empty_mapping()
+ for i in range(size):
+ a[repr(i)] = i
+ if copymode < 0:
+ b[repr(i)] = i
+ if copymode > 0:
+ b = a.copy()
+ for i in range(size):
+ ka, va = ta = a.popitem()
+ self.assertEqual(va, int(ka))
+ kb, vb = tb = b.popitem()
+ self.assertEqual(vb, int(kb))
+ self.assertTrue(not(copymode < 0 and ta != tb))
+ self.assertTrue(not a)
+ self.assertTrue(not b)
+
+ def test_pop(self):
+ BasicTestMappingProtocol.test_pop(self)
+
+ # Tests for pop with specified key
+ d = self._empty_mapping()
+ k, v = 'abc', 'def'
+
+ self.assertEqual(d.pop(k, v), v)
+ d[k] = v
+ self.assertEqual(d.pop(k, 1), v)
+
+
+class TestHashMappingProtocol(TestMappingProtocol):
+
+ def test_getitem(self):
+ TestMappingProtocol.test_getitem(self)
+ class Exc(Exception): pass
+
+ class BadEq(object):
+ def __eq__(self, other):
+ raise Exc()
+ def __hash__(self):
+ return 24
+
+ d = self._empty_mapping()
+ d[BadEq()] = 42
+ self.assertRaises(KeyError, d.__getitem__, 23)
+
+ class BadHash(object):
+ fail = False
+ def __hash__(self):
+ if self.fail:
+ raise Exc()
+ else:
+ return 42
+
+ d = self._empty_mapping()
+ x = BadHash()
+ d[x] = 42
+ x.fail = True
+ self.assertRaises(Exc, d.__getitem__, x)
+
+ def test_fromkeys(self):
+ TestMappingProtocol.test_fromkeys(self)
+ class mydict(self.type2test):
+ def __new__(cls):
+ return collections.UserDict()
+ ud = mydict.fromkeys('ab')
+ self.assertEqual(ud, {'a':None, 'b':None})
+ self.assertIsInstance(ud, collections.UserDict)
+
+ def test_pop(self):
+ TestMappingProtocol.test_pop(self)
+
+ class Exc(Exception): pass
+
+ class BadHash(object):
+ fail = False
+ def __hash__(self):
+ if self.fail:
+ raise Exc()
+ else:
+ return 42
+
+ d = self._empty_mapping()
+ x = BadHash()
+ d[x] = 42
+ x.fail = True
+ self.assertRaises(Exc, d.pop, x)
+
+ def test_mutatingiteration(self):
+ d = self._empty_mapping()
+ d[1] = 1
+ try:
+ for i in d:
+ d[i+1] = 1
+ except RuntimeError:
+ pass
+ else:
+ self.fail("changing dict size during iteration doesn't raise Error")
+
+ def test_repr(self):
+ d = self._empty_mapping()
+ self.assertEqual(repr(d), '{}')
+ d[1] = 2
+ self.assertEqual(repr(d), '{1: 2}')
+ d = self._empty_mapping()
+ d[1] = d
+ self.assertEqual(repr(d), '{1: {...}}')
+
+ class Exc(Exception): pass
+
+ class BadRepr(object):
+ def __repr__(self):
+ raise Exc()
+
+ d = self._full_mapping({1: BadRepr()})
+ self.assertRaises(Exc, repr, d)
+
+ def test_eq(self):
+ self.assertEqual(self._empty_mapping(), self._empty_mapping())
+ self.assertEqual(self._full_mapping({1: 2}),
+ self._full_mapping({1: 2}))
+
+ class Exc(Exception): pass
+
+ class BadCmp(object):
+ def __eq__(self, other):
+ raise Exc()
+ def __hash__(self):
+ return 1
+
+ d1 = self._full_mapping({BadCmp(): 1})
+ d2 = self._full_mapping({1: 1})
+ self.assertRaises(Exc, lambda: BadCmp()==1)
+ self.assertRaises(Exc, lambda: d1==d2)
+
+ def test_setdefault(self):
+ TestMappingProtocol.test_setdefault(self)
+
+ class Exc(Exception): pass
+
+ class BadHash(object):
+ fail = False
+ def __hash__(self):
+ if self.fail:
+ raise Exc()
+ else:
+ return 42
+
+ d = self._empty_mapping()
+ x = BadHash()
+ d[x] = 42
+ x.fail = True
+ self.assertRaises(Exc, d.setdefault, x, [])
--- /dev/null
+""" Tests for the linecache module """
+
+import linecache2 as linecache
+import unittest2 as unittest
+import os.path
+import tempfile
+
+from fixtures import NestedTempfile
+
+FILENAME = os.__file__
+if FILENAME.endswith('.pyc'):
+ FILENAME = FILENAME[:-1]
+NONEXISTENT_FILENAME = FILENAME + '.missing'
+INVALID_NAME = '!@$)(!@#_1'
+EMPTY = ''
+TESTS = 'inspect_fodder inspect_fodder2 mapping_tests'
+TESTS = TESTS.split()
+TEST_PATH = os.path.dirname(__file__)
+MODULES = "linecache abc".split()
+MODULE_PATH = os.path.dirname(FILENAME)
+
+SOURCE_1 = '''
+" Docstring "
+
+def function():
+ return result
+
+'''
+
+SOURCE_2 = '''
+def f():
+ return 1 + 1
+
+a = f()
+
+'''
+
+SOURCE_3 = '''
+def f():
+ return 3''' # No ending newline
+
+
+class LineCacheTests(unittest.TestCase):
+
+ def setUp(self):
+ tempdir = NestedTempfile()
+ tempdir.setUp()
+ self.addCleanup(tempdir.cleanUp)
+
+ def test_getline(self):
+ getline = linecache.getline
+
+ # Bad values for line number should return an empty string
+ self.assertEqual(getline(FILENAME, 2**15), EMPTY)
+ self.assertEqual(getline(FILENAME, -1), EMPTY)
+
+ # Float values currently raise TypeError, should it?
+ self.assertRaises(TypeError, getline, FILENAME, 1.1)
+
+ # Bad filenames should return an empty string
+ self.assertEqual(getline(EMPTY, 1), EMPTY)
+ self.assertEqual(getline(INVALID_NAME, 1), EMPTY)
+
+ # Check whether lines correspond to those from file iteration
+ for entry in TESTS:
+ filename = os.path.join(TEST_PATH, entry) + '.py'
+ with open(filename) as file:
+ for index, line in enumerate(file):
+ self.assertEqual(line, getline(filename, index + 1))
+
+ # Check module loading
+ for entry in MODULES:
+ filename = os.path.join(MODULE_PATH, entry) + '.py'
+ with open(filename) as file:
+ for index, line in enumerate(file):
+ self.assertEqual(line, getline(filename, index + 1))
+
+ # Check that bogus data isn't returned (issue #1309567)
+ empty = linecache.getlines('a/b/c/__init__.py')
+ self.assertEqual(empty, [])
+
+ def test_no_ending_newline(self):
+ temp_file = tempfile.NamedTemporaryFile(
+ suffix='.py', mode='w', delete=False)
+ self.addCleanup(os.unlink, temp_file.name)
+ with open(temp_file.name, "w") as fp:
+ fp.write(SOURCE_3)
+ lines = linecache.getlines(temp_file.name)
+ self.assertEqual(lines, ["\n", "def f():\n", " return 3\n"])
+
+ def test_clearcache(self):
+ cached = []
+ for entry in TESTS:
+ filename = os.path.join(TEST_PATH, entry) + '.py'
+ cached.append(filename)
+ linecache.getline(filename, 1)
+
+ # Are all files cached?
+ cached_empty = [fn for fn in cached if fn not in linecache.cache]
+ self.assertEqual(cached_empty, [])
+
+ # Can we clear the cache?
+ linecache.clearcache()
+ cached_empty = [fn for fn in cached if fn in linecache.cache]
+ self.assertEqual(cached_empty, [])
+
+ def test_checkcache(self):
+ getline = linecache.getline
+ # Create a source file and cache its contents
+ temp_file = tempfile.NamedTemporaryFile(
+ suffix='.py', mode='w', delete=False)
+ source_name = temp_file.name
+ self.addCleanup(os.unlink, source_name)
+ with open(source_name, 'w') as source:
+ source.write(SOURCE_1)
+ getline(source_name, 1)
+
+ # Keep a copy of the old contents
+ source_list = []
+ with open(source_name) as source:
+ for index, line in enumerate(source):
+ self.assertEqual(line, getline(source_name, index + 1))
+ source_list.append(line)
+
+ with open(source_name, 'w') as source:
+ source.write(SOURCE_2)
+
+ # Try to update a bogus cache entry
+ linecache.checkcache('dummy')
+
+ # Check that the cache matches the old contents
+ for index, line in enumerate(source_list):
+ self.assertEqual(line, getline(source_name, index + 1))
+
+ # Update the cache and check whether it matches the new source file
+ linecache.checkcache(source_name)
+ with open(source_name) as source:
+ for index, line in enumerate(source):
+ self.assertEqual(line, getline(source_name, index + 1))
+ source_list.append(line)
+
+ def test_lazycache_no_globals(self):
+ lines = linecache.getlines(FILENAME)
+ linecache.clearcache()
+ self.assertEqual(False, linecache.lazycache(FILENAME, None))
+ self.assertEqual(lines, linecache.getlines(FILENAME))
+
+ @unittest.skipIf("__loader__" not in globals(), "Modules not PEP302 by default")
+ def test_lazycache_smoke(self):
+ lines = linecache.getlines(NONEXISTENT_FILENAME, globals())
+ linecache.clearcache()
+ self.assertEqual(
+ True, linecache.lazycache(NONEXISTENT_FILENAME, globals()))
+ self.assertEqual(1, len(linecache.cache[NONEXISTENT_FILENAME]))
+ # Note here that we're looking up a non existant filename with no
+ # globals: this would error if the lazy value wasn't resolved.
+ self.assertEqual(lines, linecache.getlines(NONEXISTENT_FILENAME))
+
+ def test_lazycache_provide_after_failed_lookup(self):
+ linecache.clearcache()
+ lines = linecache.getlines(NONEXISTENT_FILENAME, globals())
+ linecache.clearcache()
+ linecache.getlines(NONEXISTENT_FILENAME)
+ linecache.lazycache(NONEXISTENT_FILENAME, globals())
+ self.assertEqual(lines, linecache.updatecache(NONEXISTENT_FILENAME))
+
+ def test_lazycache_check(self):
+ linecache.clearcache()
+ linecache.lazycache(NONEXISTENT_FILENAME, globals())
+ linecache.checkcache()
+
+ def test_lazycache_bad_filename(self):
+ linecache.clearcache()
+ self.assertEqual(False, linecache.lazycache('', globals()))
+ self.assertEqual(False, linecache.lazycache('<foo>', globals()))
+
+ @unittest.skipIf("__loader__" not in globals(), "Modules not PEP302 by default")
+ def test_lazycache_already_cached(self):
+ linecache.clearcache()
+ lines = linecache.getlines(NONEXISTENT_FILENAME, globals())
+ self.assertEqual(
+ False,
+ linecache.lazycache(NONEXISTENT_FILENAME, globals()))
+ self.assertEqual(4, len(linecache.cache[NONEXISTENT_FILENAME]))