Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / test / test_plugin.py
1 # Copyright (c) 2005 Divmod, Inc.
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Tests for Twisted plugin system.
7 """
8
9 import sys, errno, os, time
10 import compileall
11
12 from zope.interface import Interface
13
14 from twisted.trial import unittest
15 from twisted.python.log import textFromEventDict, addObserver, removeObserver
16 from twisted.python.filepath import FilePath
17 from twisted.python.util import mergeFunctionMetadata
18
19 from twisted import plugin
20
21
22
23 class ITestPlugin(Interface):
24     """
25     A plugin for use by the plugin system's unit tests.
26
27     Do not use this.
28     """
29
30
31
32 class ITestPlugin2(Interface):
33     """
34     See L{ITestPlugin}.
35     """
36
37
38
39 class PluginTestCase(unittest.TestCase):
40     """
41     Tests which verify the behavior of the current, active Twisted plugins
42     directory.
43     """
44
45     def setUp(self):
46         """
47         Save C{sys.path} and C{sys.modules}, and create a package for tests.
48         """
49         self.originalPath = sys.path[:]
50         self.savedModules = sys.modules.copy()
51
52         self.root = FilePath(self.mktemp())
53         self.root.createDirectory()
54         self.package = self.root.child('mypackage')
55         self.package.createDirectory()
56         self.package.child('__init__.py').setContent("")
57
58         FilePath(__file__).sibling('plugin_basic.py'
59             ).copyTo(self.package.child('testplugin.py'))
60
61         self.originalPlugin = "testplugin"
62
63         sys.path.insert(0, self.root.path)
64         import mypackage
65         self.module = mypackage
66
67
68     def tearDown(self):
69         """
70         Restore C{sys.path} and C{sys.modules} to their original values.
71         """
72         sys.path[:] = self.originalPath
73         sys.modules.clear()
74         sys.modules.update(self.savedModules)
75
76
77     def _unimportPythonModule(self, module, deleteSource=False):
78         modulePath = module.__name__.split('.')
79         packageName = '.'.join(modulePath[:-1])
80         moduleName = modulePath[-1]
81
82         delattr(sys.modules[packageName], moduleName)
83         del sys.modules[module.__name__]
84         for ext in ['c', 'o'] + (deleteSource and [''] or []):
85             try:
86                 os.remove(module.__file__ + ext)
87             except OSError, ose:
88                 if ose.errno != errno.ENOENT:
89                     raise
90
91
92     def _clearCache(self):
93         """
94         Remove the plugins B{droping.cache} file.
95         """
96         self.package.child('dropin.cache').remove()
97
98
99     def _withCacheness(meth):
100         """
101         This is a paranoid test wrapper, that calls C{meth} 2 times, clear the
102         cache, and calls it 2 other times. It's supposed to ensure that the
103         plugin system behaves correctly no matter what the state of the cache
104         is.
105         """
106         def wrapped(self):
107             meth(self)
108             meth(self)
109             self._clearCache()
110             meth(self)
111             meth(self)
112         return mergeFunctionMetadata(meth, wrapped)
113
114
115     def test_cache(self):
116         """
117         Check that the cache returned by L{plugin.getCache} hold the plugin
118         B{testplugin}, and that this plugin has the properties we expect:
119         provide L{TestPlugin}, has the good name and description, and can be
120         loaded successfully.
121         """
122         cache = plugin.getCache(self.module)
123
124         dropin = cache[self.originalPlugin]
125         self.assertEqual(dropin.moduleName,
126                           'mypackage.%s' % (self.originalPlugin,))
127         self.assertIn("I'm a test drop-in.", dropin.description)
128
129         # Note, not the preferred way to get a plugin by its interface.
130         p1 = [p for p in dropin.plugins if ITestPlugin in p.provided][0]
131         self.assertIdentical(p1.dropin, dropin)
132         self.assertEqual(p1.name, "TestPlugin")
133
134         # Check the content of the description comes from the plugin module
135         # docstring
136         self.assertEqual(
137             p1.description.strip(),
138             "A plugin used solely for testing purposes.")
139         self.assertEqual(p1.provided, [ITestPlugin, plugin.IPlugin])
140         realPlugin = p1.load()
141         # The plugin should match the class present in sys.modules
142         self.assertIdentical(
143             realPlugin,
144             sys.modules['mypackage.%s' % (self.originalPlugin,)].TestPlugin)
145
146         # And it should also match if we import it classicly
147         import mypackage.testplugin as tp
148         self.assertIdentical(realPlugin, tp.TestPlugin)
149
150     test_cache = _withCacheness(test_cache)
151
152
153     def test_plugins(self):
154         """
155         L{plugin.getPlugins} should return the list of plugins matching the
156         specified interface (here, L{ITestPlugin2}), and these plugins
157         should be instances of classes with a C{test} method, to be sure
158         L{plugin.getPlugins} load classes correctly.
159         """
160         plugins = list(plugin.getPlugins(ITestPlugin2, self.module))
161
162         self.assertEqual(len(plugins), 2)
163
164         names = ['AnotherTestPlugin', 'ThirdTestPlugin']
165         for p in plugins:
166             names.remove(p.__name__)
167             p.test()
168
169     test_plugins = _withCacheness(test_plugins)
170
171
172     def test_detectNewFiles(self):
173         """
174         Check that L{plugin.getPlugins} is able to detect plugins added at
175         runtime.
176         """
177         FilePath(__file__).sibling('plugin_extra1.py'
178             ).copyTo(self.package.child('pluginextra.py'))
179         try:
180             # Check that the current situation is clean
181             self.failIfIn('mypackage.pluginextra', sys.modules)
182             self.failIf(hasattr(sys.modules['mypackage'], 'pluginextra'),
183                         "mypackage still has pluginextra module")
184
185             plgs = list(plugin.getPlugins(ITestPlugin, self.module))
186
187             # We should find 2 plugins: the one in testplugin, and the one in
188             # pluginextra
189             self.assertEqual(len(plgs), 2)
190
191             names = ['TestPlugin', 'FourthTestPlugin']
192             for p in plgs:
193                 names.remove(p.__name__)
194                 p.test1()
195         finally:
196             self._unimportPythonModule(
197                 sys.modules['mypackage.pluginextra'],
198                 True)
199
200     test_detectNewFiles = _withCacheness(test_detectNewFiles)
201
202
203     def test_detectFilesChanged(self):
204         """
205         Check that if the content of a plugin change, L{plugin.getPlugins} is
206         able to detect the new plugins added.
207         """
208         FilePath(__file__).sibling('plugin_extra1.py'
209             ).copyTo(self.package.child('pluginextra.py'))
210         try:
211             plgs = list(plugin.getPlugins(ITestPlugin, self.module))
212             # Sanity check
213             self.assertEqual(len(plgs), 2)
214
215             FilePath(__file__).sibling('plugin_extra2.py'
216                 ).copyTo(self.package.child('pluginextra.py'))
217
218             # Fake out Python.
219             self._unimportPythonModule(sys.modules['mypackage.pluginextra'])
220
221             # Make sure additions are noticed
222             plgs = list(plugin.getPlugins(ITestPlugin, self.module))
223
224             self.assertEqual(len(plgs), 3)
225
226             names = ['TestPlugin', 'FourthTestPlugin', 'FifthTestPlugin']
227             for p in plgs:
228                 names.remove(p.__name__)
229                 p.test1()
230         finally:
231             self._unimportPythonModule(
232                 sys.modules['mypackage.pluginextra'],
233                 True)
234
235     test_detectFilesChanged = _withCacheness(test_detectFilesChanged)
236
237
238     def test_detectFilesRemoved(self):
239         """
240         Check that when a dropin file is removed, L{plugin.getPlugins} doesn't
241         return it anymore.
242         """
243         FilePath(__file__).sibling('plugin_extra1.py'
244             ).copyTo(self.package.child('pluginextra.py'))
245         try:
246             # Generate a cache with pluginextra in it.
247             list(plugin.getPlugins(ITestPlugin, self.module))
248
249         finally:
250             self._unimportPythonModule(
251                 sys.modules['mypackage.pluginextra'],
252                 True)
253         plgs = list(plugin.getPlugins(ITestPlugin, self.module))
254         self.assertEqual(1, len(plgs))
255
256     test_detectFilesRemoved = _withCacheness(test_detectFilesRemoved)
257
258
259     def test_nonexistentPathEntry(self):
260         """
261         Test that getCache skips over any entries in a plugin package's
262         C{__path__} which do not exist.
263         """
264         path = self.mktemp()
265         self.failIf(os.path.exists(path))
266         # Add the test directory to the plugins path
267         self.module.__path__.append(path)
268         try:
269             plgs = list(plugin.getPlugins(ITestPlugin, self.module))
270             self.assertEqual(len(plgs), 1)
271         finally:
272             self.module.__path__.remove(path)
273
274     test_nonexistentPathEntry = _withCacheness(test_nonexistentPathEntry)
275
276
277     def test_nonDirectoryChildEntry(self):
278         """
279         Test that getCache skips over any entries in a plugin package's
280         C{__path__} which refer to children of paths which are not directories.
281         """
282         path = FilePath(self.mktemp())
283         self.failIf(path.exists())
284         path.touch()
285         child = path.child("test_package").path
286         self.module.__path__.append(child)
287         try:
288             plgs = list(plugin.getPlugins(ITestPlugin, self.module))
289             self.assertEqual(len(plgs), 1)
290         finally:
291             self.module.__path__.remove(child)
292
293     test_nonDirectoryChildEntry = _withCacheness(test_nonDirectoryChildEntry)
294
295
296     def test_deployedMode(self):
297         """
298         The C{dropin.cache} file may not be writable: the cache should still be
299         attainable, but an error should be logged to show that the cache
300         couldn't be updated.
301         """
302         # Generate the cache
303         plugin.getCache(self.module)
304
305         cachepath = self.package.child('dropin.cache')
306
307         # Add a new plugin
308         FilePath(__file__).sibling('plugin_extra1.py'
309             ).copyTo(self.package.child('pluginextra.py'))
310
311         os.chmod(self.package.path, 0500)
312         # Change the right of dropin.cache too for windows
313         os.chmod(cachepath.path, 0400)
314         self.addCleanup(os.chmod, self.package.path, 0700)
315         self.addCleanup(os.chmod, cachepath.path, 0700)
316
317         # Start observing log events to see the warning
318         events = []
319         addObserver(events.append)
320         self.addCleanup(removeObserver, events.append)
321
322         cache = plugin.getCache(self.module)
323         # The new plugin should be reported
324         self.assertIn('pluginextra', cache)
325         self.assertIn(self.originalPlugin, cache)
326
327         # Make sure something was logged about the cache.
328         expected = "Unable to write to plugin cache %s: error number %d" % (
329             cachepath.path, errno.EPERM)
330         for event in events:
331             if expected in textFromEventDict(event):
332                 break
333         else:
334             self.fail(
335                 "Did not observe unwriteable cache warning in log "
336                 "events: %r" % (events,))
337
338
339
340 # This is something like the Twisted plugins file.
341 pluginInitFile = """
342 from twisted.plugin import pluginPackagePaths
343 __path__.extend(pluginPackagePaths(__name__))
344 __all__ = []
345 """
346
347 def pluginFileContents(name):
348     return (
349         "from zope.interface import classProvides\n"
350         "from twisted.plugin import IPlugin\n"
351         "from twisted.test.test_plugin import ITestPlugin\n"
352         "\n"
353         "class %s(object):\n"
354         "    classProvides(IPlugin, ITestPlugin)\n") % (name,)
355
356
357 def _createPluginDummy(entrypath, pluginContent, real, pluginModule):
358     """
359     Create a plugindummy package.
360     """
361     entrypath.createDirectory()
362     pkg = entrypath.child('plugindummy')
363     pkg.createDirectory()
364     if real:
365         pkg.child('__init__.py').setContent('')
366     plugs = pkg.child('plugins')
367     plugs.createDirectory()
368     if real:
369         plugs.child('__init__.py').setContent(pluginInitFile)
370     plugs.child(pluginModule + '.py').setContent(pluginContent)
371     return plugs
372
373
374
375 class DeveloperSetupTests(unittest.TestCase):
376     """
377     These tests verify things about the plugin system without actually
378     interacting with the deployed 'twisted.plugins' package, instead creating a
379     temporary package.
380     """
381
382     def setUp(self):
383         """
384         Create a complex environment with multiple entries on sys.path, akin to
385         a developer's environment who has a development (trunk) checkout of
386         Twisted, a system installed version of Twisted (for their operating
387         system's tools) and a project which provides Twisted plugins.
388         """
389         self.savedPath = sys.path[:]
390         self.savedModules = sys.modules.copy()
391         self.fakeRoot = FilePath(self.mktemp())
392         self.fakeRoot.createDirectory()
393         self.systemPath = self.fakeRoot.child('system_path')
394         self.devPath = self.fakeRoot.child('development_path')
395         self.appPath = self.fakeRoot.child('application_path')
396         self.systemPackage = _createPluginDummy(
397             self.systemPath, pluginFileContents('system'),
398             True, 'plugindummy_builtin')
399         self.devPackage = _createPluginDummy(
400             self.devPath, pluginFileContents('dev'),
401             True, 'plugindummy_builtin')
402         self.appPackage = _createPluginDummy(
403             self.appPath, pluginFileContents('app'),
404             False, 'plugindummy_app')
405
406         # Now we're going to do the system installation.
407         sys.path.extend([x.path for x in [self.systemPath,
408                                           self.appPath]])
409         # Run all the way through the plugins list to cause the
410         # L{plugin.getPlugins} generator to write cache files for the system
411         # installation.
412         self.getAllPlugins()
413         self.sysplug = self.systemPath.child('plugindummy').child('plugins')
414         self.syscache = self.sysplug.child('dropin.cache')
415         # Make sure there's a nice big difference in modification times so that
416         # we won't re-build the system cache.
417         now = time.time()
418         os.utime(
419             self.sysplug.child('plugindummy_builtin.py').path,
420             (now - 5000,) * 2)
421         os.utime(self.syscache.path, (now - 2000,) * 2)
422         # For extra realism, let's make sure that the system path is no longer
423         # writable.
424         self.lockSystem()
425         self.resetEnvironment()
426
427
428     def lockSystem(self):
429         """
430         Lock the system directories, as if they were unwritable by this user.
431         """
432         os.chmod(self.sysplug.path, 0555)
433         os.chmod(self.syscache.path, 0555)
434
435
436     def unlockSystem(self):
437         """
438         Unlock the system directories, as if they were writable by this user.
439         """
440         os.chmod(self.sysplug.path, 0777)
441         os.chmod(self.syscache.path, 0777)
442
443
444     def getAllPlugins(self):
445         """
446         Get all the plugins loadable from our dummy package, and return their
447         short names.
448         """
449         # Import the module we just added to our path.  (Local scope because
450         # this package doesn't exist outside of this test.)
451         import plugindummy.plugins
452         x = list(plugin.getPlugins(ITestPlugin, plugindummy.plugins))
453         return [plug.__name__ for plug in x]
454
455
456     def resetEnvironment(self):
457         """
458         Change the environment to what it should be just as the test is
459         starting.
460         """
461         self.unsetEnvironment()
462         sys.path.extend([x.path for x in [self.devPath,
463                                           self.systemPath,
464                                           self.appPath]])
465
466     def unsetEnvironment(self):
467         """
468         Change the Python environment back to what it was before the test was
469         started.
470         """
471         sys.modules.clear()
472         sys.modules.update(self.savedModules)
473         sys.path[:] = self.savedPath
474
475
476     def tearDown(self):
477         """
478         Reset the Python environment to what it was before this test ran, and
479         restore permissions on files which were marked read-only so that the
480         directory may be cleanly cleaned up.
481         """
482         self.unsetEnvironment()
483         # Normally we wouldn't "clean up" the filesystem like this (leaving
484         # things for post-test inspection), but if we left the permissions the
485         # way they were, we'd be leaving files around that the buildbots
486         # couldn't delete, and that would be bad.
487         self.unlockSystem()
488
489
490     def test_developmentPluginAvailability(self):
491         """
492         Plugins added in the development path should be loadable, even when
493         the (now non-importable) system path contains its own idea of the
494         list of plugins for a package.  Inversely, plugins added in the
495         system path should not be available.
496         """
497         # Run 3 times: uncached, cached, and then cached again to make sure we
498         # didn't overwrite / corrupt the cache on the cached try.
499         for x in range(3):
500             names = self.getAllPlugins()
501             names.sort()
502             self.assertEqual(names, ['app', 'dev'])
503
504
505     def test_freshPyReplacesStalePyc(self):
506         """
507         Verify that if a stale .pyc file on the PYTHONPATH is replaced by a
508         fresh .py file, the plugins in the new .py are picked up rather than
509         the stale .pyc, even if the .pyc is still around.
510         """
511         mypath = self.appPackage.child("stale.py")
512         mypath.setContent(pluginFileContents('one'))
513         # Make it super stale
514         x = time.time() - 1000
515         os.utime(mypath.path, (x, x))
516         pyc = mypath.sibling('stale.pyc')
517         # compile it
518         compileall.compile_dir(self.appPackage.path, quiet=1)
519         os.utime(pyc.path, (x, x))
520         # Eliminate the other option.
521         mypath.remove()
522         # Make sure it's the .pyc path getting cached.
523         self.resetEnvironment()
524         # Sanity check.
525         self.assertIn('one', self.getAllPlugins())
526         self.failIfIn('two', self.getAllPlugins())
527         self.resetEnvironment()
528         mypath.setContent(pluginFileContents('two'))
529         self.failIfIn('one', self.getAllPlugins())
530         self.assertIn('two', self.getAllPlugins())
531
532
533     def test_newPluginsOnReadOnlyPath(self):
534         """
535         Verify that a failure to write the dropin.cache file on a read-only
536         path will not affect the list of plugins returned.
537
538         Note: this test should pass on both Linux and Windows, but may not
539         provide useful coverage on Windows due to the different meaning of
540         "read-only directory".
541         """
542         self.unlockSystem()
543         self.sysplug.child('newstuff.py').setContent(pluginFileContents('one'))
544         self.lockSystem()
545
546         # Take the developer path out, so that the system plugins are actually
547         # examined.
548         sys.path.remove(self.devPath.path)
549
550         # Start observing log events to see the warning
551         events = []
552         addObserver(events.append)
553         self.addCleanup(removeObserver, events.append)
554
555         self.assertIn('one', self.getAllPlugins())
556
557         # Make sure something was logged about the cache.
558         expected = "Unable to write to plugin cache %s: error number %d" % (
559             self.syscache.path, errno.EPERM)
560         for event in events:
561             if expected in textFromEventDict(event):
562                 break
563         else:
564             self.fail(
565                 "Did not observe unwriteable cache warning in log "
566                 "events: %r" % (events,))
567
568
569
570 class AdjacentPackageTests(unittest.TestCase):
571     """
572     Tests for the behavior of the plugin system when there are multiple
573     installed copies of the package containing the plugins being loaded.
574     """
575
576     def setUp(self):
577         """
578         Save the elements of C{sys.path} and the items of C{sys.modules}.
579         """
580         self.originalPath = sys.path[:]
581         self.savedModules = sys.modules.copy()
582
583
584     def tearDown(self):
585         """
586         Restore C{sys.path} and C{sys.modules} to their original values.
587         """
588         sys.path[:] = self.originalPath
589         sys.modules.clear()
590         sys.modules.update(self.savedModules)
591
592
593     def createDummyPackage(self, root, name, pluginName):
594         """
595         Create a directory containing a Python package named I{dummy} with a
596         I{plugins} subpackage.
597
598         @type root: L{FilePath}
599         @param root: The directory in which to create the hierarchy.
600
601         @type name: C{str}
602         @param name: The name of the directory to create which will contain
603             the package.
604
605         @type pluginName: C{str}
606         @param pluginName: The name of a module to create in the
607             I{dummy.plugins} package.
608
609         @rtype: L{FilePath}
610         @return: The directory which was created to contain the I{dummy}
611             package.
612         """
613         directory = root.child(name)
614         package = directory.child('dummy')
615         package.makedirs()
616         package.child('__init__.py').setContent('')
617         plugins = package.child('plugins')
618         plugins.makedirs()
619         plugins.child('__init__.py').setContent(pluginInitFile)
620         pluginModule = plugins.child(pluginName + '.py')
621         pluginModule.setContent(pluginFileContents(name))
622         return directory
623
624
625     def test_hiddenPackageSamePluginModuleNameObscured(self):
626         """
627         Only plugins from the first package in sys.path should be returned by
628         getPlugins in the case where there are two Python packages by the same
629         name installed, each with a plugin module by a single name.
630         """
631         root = FilePath(self.mktemp())
632         root.makedirs()
633
634         firstDirectory = self.createDummyPackage(root, 'first', 'someplugin')
635         secondDirectory = self.createDummyPackage(root, 'second', 'someplugin')
636
637         sys.path.append(firstDirectory.path)
638         sys.path.append(secondDirectory.path)
639
640         import dummy.plugins
641
642         plugins = list(plugin.getPlugins(ITestPlugin, dummy.plugins))
643         self.assertEqual(['first'], [p.__name__ for p in plugins])
644
645
646     def test_hiddenPackageDifferentPluginModuleNameObscured(self):
647         """
648         Plugins from the first package in sys.path should be returned by
649         getPlugins in the case where there are two Python packages by the same
650         name installed, each with a plugin module by a different name.
651         """
652         root = FilePath(self.mktemp())
653         root.makedirs()
654
655         firstDirectory = self.createDummyPackage(root, 'first', 'thisplugin')
656         secondDirectory = self.createDummyPackage(root, 'second', 'thatplugin')
657
658         sys.path.append(firstDirectory.path)
659         sys.path.append(secondDirectory.path)
660
661         import dummy.plugins
662
663         plugins = list(plugin.getPlugins(ITestPlugin, dummy.plugins))
664         self.assertEqual(['first'], [p.__name__ for p in plugins])
665
666
667
668 class PackagePathTests(unittest.TestCase):
669     """
670     Tests for L{plugin.pluginPackagePaths} which constructs search paths for
671     plugin packages.
672     """
673
674     def setUp(self):
675         """
676         Save the elements of C{sys.path}.
677         """
678         self.originalPath = sys.path[:]
679
680
681     def tearDown(self):
682         """
683         Restore C{sys.path} to its original value.
684         """
685         sys.path[:] = self.originalPath
686
687
688     def test_pluginDirectories(self):
689         """
690         L{plugin.pluginPackagePaths} should return a list containing each
691         directory in C{sys.path} with a suffix based on the supplied package
692         name.
693         """
694         foo = FilePath('foo')
695         bar = FilePath('bar')
696         sys.path = [foo.path, bar.path]
697         self.assertEqual(
698             plugin.pluginPackagePaths('dummy.plugins'),
699             [foo.child('dummy').child('plugins').path,
700              bar.child('dummy').child('plugins').path])
701
702
703     def test_pluginPackagesExcluded(self):
704         """
705         L{plugin.pluginPackagePaths} should exclude directories which are
706         Python packages.  The only allowed plugin package (the only one
707         associated with a I{dummy} package which Python will allow to be
708         imported) will already be known to the caller of
709         L{plugin.pluginPackagePaths} and will most commonly already be in
710         the C{__path__} they are about to mutate.
711         """
712         root = FilePath(self.mktemp())
713         foo = root.child('foo').child('dummy').child('plugins')
714         foo.makedirs()
715         foo.child('__init__.py').setContent('')
716         sys.path = [root.child('foo').path, root.child('bar').path]
717         self.assertEqual(
718             plugin.pluginPackagePaths('dummy.plugins'),
719             [root.child('bar').child('dummy').child('plugins').path])