1 # Copyright (c) 2005 Divmod, Inc.
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Tests for Twisted plugin system.
9 import sys, errno, os, time
12 from zope.interface import Interface
14 from twisted.trial import unittest
15 from twisted.python.log import textFromEventDict, addObserver, removeObserver
16 from twisted.python.filepath import FilePath
17 from twisted.python.util import mergeFunctionMetadata
19 from twisted import plugin
23 class ITestPlugin(Interface):
25 A plugin for use by the plugin system's unit tests.
32 class ITestPlugin2(Interface):
39 class PluginTestCase(unittest.TestCase):
41 Tests which verify the behavior of the current, active Twisted plugins
47 Save C{sys.path} and C{sys.modules}, and create a package for tests.
49 self.originalPath = sys.path[:]
50 self.savedModules = sys.modules.copy()
52 self.root = FilePath(self.mktemp())
53 self.root.createDirectory()
54 self.package = self.root.child('mypackage')
55 self.package.createDirectory()
56 self.package.child('__init__.py').setContent("")
58 FilePath(__file__).sibling('plugin_basic.py'
59 ).copyTo(self.package.child('testplugin.py'))
61 self.originalPlugin = "testplugin"
63 sys.path.insert(0, self.root.path)
65 self.module = mypackage
70 Restore C{sys.path} and C{sys.modules} to their original values.
72 sys.path[:] = self.originalPath
74 sys.modules.update(self.savedModules)
77 def _unimportPythonModule(self, module, deleteSource=False):
78 modulePath = module.__name__.split('.')
79 packageName = '.'.join(modulePath[:-1])
80 moduleName = modulePath[-1]
82 delattr(sys.modules[packageName], moduleName)
83 del sys.modules[module.__name__]
84 for ext in ['c', 'o'] + (deleteSource and [''] or []):
86 os.remove(module.__file__ + ext)
88 if ose.errno != errno.ENOENT:
92 def _clearCache(self):
94 Remove the plugins B{droping.cache} file.
96 self.package.child('dropin.cache').remove()
99 def _withCacheness(meth):
101 This is a paranoid test wrapper, that calls C{meth} 2 times, clear the
102 cache, and calls it 2 other times. It's supposed to ensure that the
103 plugin system behaves correctly no matter what the state of the cache
112 return mergeFunctionMetadata(meth, wrapped)
115 def test_cache(self):
117 Check that the cache returned by L{plugin.getCache} hold the plugin
118 B{testplugin}, and that this plugin has the properties we expect:
119 provide L{TestPlugin}, has the good name and description, and can be
122 cache = plugin.getCache(self.module)
124 dropin = cache[self.originalPlugin]
125 self.assertEqual(dropin.moduleName,
126 'mypackage.%s' % (self.originalPlugin,))
127 self.assertIn("I'm a test drop-in.", dropin.description)
129 # Note, not the preferred way to get a plugin by its interface.
130 p1 = [p for p in dropin.plugins if ITestPlugin in p.provided][0]
131 self.assertIdentical(p1.dropin, dropin)
132 self.assertEqual(p1.name, "TestPlugin")
134 # Check the content of the description comes from the plugin module
137 p1.description.strip(),
138 "A plugin used solely for testing purposes.")
139 self.assertEqual(p1.provided, [ITestPlugin, plugin.IPlugin])
140 realPlugin = p1.load()
141 # The plugin should match the class present in sys.modules
142 self.assertIdentical(
144 sys.modules['mypackage.%s' % (self.originalPlugin,)].TestPlugin)
146 # And it should also match if we import it classicly
147 import mypackage.testplugin as tp
148 self.assertIdentical(realPlugin, tp.TestPlugin)
150 test_cache = _withCacheness(test_cache)
153 def test_plugins(self):
155 L{plugin.getPlugins} should return the list of plugins matching the
156 specified interface (here, L{ITestPlugin2}), and these plugins
157 should be instances of classes with a C{test} method, to be sure
158 L{plugin.getPlugins} load classes correctly.
160 plugins = list(plugin.getPlugins(ITestPlugin2, self.module))
162 self.assertEqual(len(plugins), 2)
164 names = ['AnotherTestPlugin', 'ThirdTestPlugin']
166 names.remove(p.__name__)
169 test_plugins = _withCacheness(test_plugins)
172 def test_detectNewFiles(self):
174 Check that L{plugin.getPlugins} is able to detect plugins added at
177 FilePath(__file__).sibling('plugin_extra1.py'
178 ).copyTo(self.package.child('pluginextra.py'))
180 # Check that the current situation is clean
181 self.failIfIn('mypackage.pluginextra', sys.modules)
182 self.failIf(hasattr(sys.modules['mypackage'], 'pluginextra'),
183 "mypackage still has pluginextra module")
185 plgs = list(plugin.getPlugins(ITestPlugin, self.module))
187 # We should find 2 plugins: the one in testplugin, and the one in
189 self.assertEqual(len(plgs), 2)
191 names = ['TestPlugin', 'FourthTestPlugin']
193 names.remove(p.__name__)
196 self._unimportPythonModule(
197 sys.modules['mypackage.pluginextra'],
200 test_detectNewFiles = _withCacheness(test_detectNewFiles)
203 def test_detectFilesChanged(self):
205 Check that if the content of a plugin change, L{plugin.getPlugins} is
206 able to detect the new plugins added.
208 FilePath(__file__).sibling('plugin_extra1.py'
209 ).copyTo(self.package.child('pluginextra.py'))
211 plgs = list(plugin.getPlugins(ITestPlugin, self.module))
213 self.assertEqual(len(plgs), 2)
215 FilePath(__file__).sibling('plugin_extra2.py'
216 ).copyTo(self.package.child('pluginextra.py'))
219 self._unimportPythonModule(sys.modules['mypackage.pluginextra'])
221 # Make sure additions are noticed
222 plgs = list(plugin.getPlugins(ITestPlugin, self.module))
224 self.assertEqual(len(plgs), 3)
226 names = ['TestPlugin', 'FourthTestPlugin', 'FifthTestPlugin']
228 names.remove(p.__name__)
231 self._unimportPythonModule(
232 sys.modules['mypackage.pluginextra'],
235 test_detectFilesChanged = _withCacheness(test_detectFilesChanged)
238 def test_detectFilesRemoved(self):
240 Check that when a dropin file is removed, L{plugin.getPlugins} doesn't
243 FilePath(__file__).sibling('plugin_extra1.py'
244 ).copyTo(self.package.child('pluginextra.py'))
246 # Generate a cache with pluginextra in it.
247 list(plugin.getPlugins(ITestPlugin, self.module))
250 self._unimportPythonModule(
251 sys.modules['mypackage.pluginextra'],
253 plgs = list(plugin.getPlugins(ITestPlugin, self.module))
254 self.assertEqual(1, len(plgs))
256 test_detectFilesRemoved = _withCacheness(test_detectFilesRemoved)
259 def test_nonexistentPathEntry(self):
261 Test that getCache skips over any entries in a plugin package's
262 C{__path__} which do not exist.
265 self.failIf(os.path.exists(path))
266 # Add the test directory to the plugins path
267 self.module.__path__.append(path)
269 plgs = list(plugin.getPlugins(ITestPlugin, self.module))
270 self.assertEqual(len(plgs), 1)
272 self.module.__path__.remove(path)
274 test_nonexistentPathEntry = _withCacheness(test_nonexistentPathEntry)
277 def test_nonDirectoryChildEntry(self):
279 Test that getCache skips over any entries in a plugin package's
280 C{__path__} which refer to children of paths which are not directories.
282 path = FilePath(self.mktemp())
283 self.failIf(path.exists())
285 child = path.child("test_package").path
286 self.module.__path__.append(child)
288 plgs = list(plugin.getPlugins(ITestPlugin, self.module))
289 self.assertEqual(len(plgs), 1)
291 self.module.__path__.remove(child)
293 test_nonDirectoryChildEntry = _withCacheness(test_nonDirectoryChildEntry)
296 def test_deployedMode(self):
298 The C{dropin.cache} file may not be writable: the cache should still be
299 attainable, but an error should be logged to show that the cache
303 plugin.getCache(self.module)
305 cachepath = self.package.child('dropin.cache')
308 FilePath(__file__).sibling('plugin_extra1.py'
309 ).copyTo(self.package.child('pluginextra.py'))
311 os.chmod(self.package.path, 0500)
312 # Change the right of dropin.cache too for windows
313 os.chmod(cachepath.path, 0400)
314 self.addCleanup(os.chmod, self.package.path, 0700)
315 self.addCleanup(os.chmod, cachepath.path, 0700)
317 # Start observing log events to see the warning
319 addObserver(events.append)
320 self.addCleanup(removeObserver, events.append)
322 cache = plugin.getCache(self.module)
323 # The new plugin should be reported
324 self.assertIn('pluginextra', cache)
325 self.assertIn(self.originalPlugin, cache)
327 # Make sure something was logged about the cache.
328 expected = "Unable to write to plugin cache %s: error number %d" % (
329 cachepath.path, errno.EPERM)
331 if expected in textFromEventDict(event):
335 "Did not observe unwriteable cache warning in log "
336 "events: %r" % (events,))
340 # This is something like the Twisted plugins file.
342 from twisted.plugin import pluginPackagePaths
343 __path__.extend(pluginPackagePaths(__name__))
347 def pluginFileContents(name):
349 "from zope.interface import classProvides\n"
350 "from twisted.plugin import IPlugin\n"
351 "from twisted.test.test_plugin import ITestPlugin\n"
353 "class %s(object):\n"
354 " classProvides(IPlugin, ITestPlugin)\n") % (name,)
357 def _createPluginDummy(entrypath, pluginContent, real, pluginModule):
359 Create a plugindummy package.
361 entrypath.createDirectory()
362 pkg = entrypath.child('plugindummy')
363 pkg.createDirectory()
365 pkg.child('__init__.py').setContent('')
366 plugs = pkg.child('plugins')
367 plugs.createDirectory()
369 plugs.child('__init__.py').setContent(pluginInitFile)
370 plugs.child(pluginModule + '.py').setContent(pluginContent)
375 class DeveloperSetupTests(unittest.TestCase):
377 These tests verify things about the plugin system without actually
378 interacting with the deployed 'twisted.plugins' package, instead creating a
384 Create a complex environment with multiple entries on sys.path, akin to
385 a developer's environment who has a development (trunk) checkout of
386 Twisted, a system installed version of Twisted (for their operating
387 system's tools) and a project which provides Twisted plugins.
389 self.savedPath = sys.path[:]
390 self.savedModules = sys.modules.copy()
391 self.fakeRoot = FilePath(self.mktemp())
392 self.fakeRoot.createDirectory()
393 self.systemPath = self.fakeRoot.child('system_path')
394 self.devPath = self.fakeRoot.child('development_path')
395 self.appPath = self.fakeRoot.child('application_path')
396 self.systemPackage = _createPluginDummy(
397 self.systemPath, pluginFileContents('system'),
398 True, 'plugindummy_builtin')
399 self.devPackage = _createPluginDummy(
400 self.devPath, pluginFileContents('dev'),
401 True, 'plugindummy_builtin')
402 self.appPackage = _createPluginDummy(
403 self.appPath, pluginFileContents('app'),
404 False, 'plugindummy_app')
406 # Now we're going to do the system installation.
407 sys.path.extend([x.path for x in [self.systemPath,
409 # Run all the way through the plugins list to cause the
410 # L{plugin.getPlugins} generator to write cache files for the system
413 self.sysplug = self.systemPath.child('plugindummy').child('plugins')
414 self.syscache = self.sysplug.child('dropin.cache')
415 # Make sure there's a nice big difference in modification times so that
416 # we won't re-build the system cache.
419 self.sysplug.child('plugindummy_builtin.py').path,
421 os.utime(self.syscache.path, (now - 2000,) * 2)
422 # For extra realism, let's make sure that the system path is no longer
425 self.resetEnvironment()
428 def lockSystem(self):
430 Lock the system directories, as if they were unwritable by this user.
432 os.chmod(self.sysplug.path, 0555)
433 os.chmod(self.syscache.path, 0555)
436 def unlockSystem(self):
438 Unlock the system directories, as if they were writable by this user.
440 os.chmod(self.sysplug.path, 0777)
441 os.chmod(self.syscache.path, 0777)
444 def getAllPlugins(self):
446 Get all the plugins loadable from our dummy package, and return their
449 # Import the module we just added to our path. (Local scope because
450 # this package doesn't exist outside of this test.)
451 import plugindummy.plugins
452 x = list(plugin.getPlugins(ITestPlugin, plugindummy.plugins))
453 return [plug.__name__ for plug in x]
456 def resetEnvironment(self):
458 Change the environment to what it should be just as the test is
461 self.unsetEnvironment()
462 sys.path.extend([x.path for x in [self.devPath,
466 def unsetEnvironment(self):
468 Change the Python environment back to what it was before the test was
472 sys.modules.update(self.savedModules)
473 sys.path[:] = self.savedPath
478 Reset the Python environment to what it was before this test ran, and
479 restore permissions on files which were marked read-only so that the
480 directory may be cleanly cleaned up.
482 self.unsetEnvironment()
483 # Normally we wouldn't "clean up" the filesystem like this (leaving
484 # things for post-test inspection), but if we left the permissions the
485 # way they were, we'd be leaving files around that the buildbots
486 # couldn't delete, and that would be bad.
490 def test_developmentPluginAvailability(self):
492 Plugins added in the development path should be loadable, even when
493 the (now non-importable) system path contains its own idea of the
494 list of plugins for a package. Inversely, plugins added in the
495 system path should not be available.
497 # Run 3 times: uncached, cached, and then cached again to make sure we
498 # didn't overwrite / corrupt the cache on the cached try.
500 names = self.getAllPlugins()
502 self.assertEqual(names, ['app', 'dev'])
505 def test_freshPyReplacesStalePyc(self):
507 Verify that if a stale .pyc file on the PYTHONPATH is replaced by a
508 fresh .py file, the plugins in the new .py are picked up rather than
509 the stale .pyc, even if the .pyc is still around.
511 mypath = self.appPackage.child("stale.py")
512 mypath.setContent(pluginFileContents('one'))
513 # Make it super stale
514 x = time.time() - 1000
515 os.utime(mypath.path, (x, x))
516 pyc = mypath.sibling('stale.pyc')
518 compileall.compile_dir(self.appPackage.path, quiet=1)
519 os.utime(pyc.path, (x, x))
520 # Eliminate the other option.
522 # Make sure it's the .pyc path getting cached.
523 self.resetEnvironment()
525 self.assertIn('one', self.getAllPlugins())
526 self.failIfIn('two', self.getAllPlugins())
527 self.resetEnvironment()
528 mypath.setContent(pluginFileContents('two'))
529 self.failIfIn('one', self.getAllPlugins())
530 self.assertIn('two', self.getAllPlugins())
533 def test_newPluginsOnReadOnlyPath(self):
535 Verify that a failure to write the dropin.cache file on a read-only
536 path will not affect the list of plugins returned.
538 Note: this test should pass on both Linux and Windows, but may not
539 provide useful coverage on Windows due to the different meaning of
540 "read-only directory".
543 self.sysplug.child('newstuff.py').setContent(pluginFileContents('one'))
546 # Take the developer path out, so that the system plugins are actually
548 sys.path.remove(self.devPath.path)
550 # Start observing log events to see the warning
552 addObserver(events.append)
553 self.addCleanup(removeObserver, events.append)
555 self.assertIn('one', self.getAllPlugins())
557 # Make sure something was logged about the cache.
558 expected = "Unable to write to plugin cache %s: error number %d" % (
559 self.syscache.path, errno.EPERM)
561 if expected in textFromEventDict(event):
565 "Did not observe unwriteable cache warning in log "
566 "events: %r" % (events,))
570 class AdjacentPackageTests(unittest.TestCase):
572 Tests for the behavior of the plugin system when there are multiple
573 installed copies of the package containing the plugins being loaded.
578 Save the elements of C{sys.path} and the items of C{sys.modules}.
580 self.originalPath = sys.path[:]
581 self.savedModules = sys.modules.copy()
586 Restore C{sys.path} and C{sys.modules} to their original values.
588 sys.path[:] = self.originalPath
590 sys.modules.update(self.savedModules)
593 def createDummyPackage(self, root, name, pluginName):
595 Create a directory containing a Python package named I{dummy} with a
596 I{plugins} subpackage.
598 @type root: L{FilePath}
599 @param root: The directory in which to create the hierarchy.
602 @param name: The name of the directory to create which will contain
605 @type pluginName: C{str}
606 @param pluginName: The name of a module to create in the
607 I{dummy.plugins} package.
610 @return: The directory which was created to contain the I{dummy}
613 directory = root.child(name)
614 package = directory.child('dummy')
616 package.child('__init__.py').setContent('')
617 plugins = package.child('plugins')
619 plugins.child('__init__.py').setContent(pluginInitFile)
620 pluginModule = plugins.child(pluginName + '.py')
621 pluginModule.setContent(pluginFileContents(name))
625 def test_hiddenPackageSamePluginModuleNameObscured(self):
627 Only plugins from the first package in sys.path should be returned by
628 getPlugins in the case where there are two Python packages by the same
629 name installed, each with a plugin module by a single name.
631 root = FilePath(self.mktemp())
634 firstDirectory = self.createDummyPackage(root, 'first', 'someplugin')
635 secondDirectory = self.createDummyPackage(root, 'second', 'someplugin')
637 sys.path.append(firstDirectory.path)
638 sys.path.append(secondDirectory.path)
642 plugins = list(plugin.getPlugins(ITestPlugin, dummy.plugins))
643 self.assertEqual(['first'], [p.__name__ for p in plugins])
646 def test_hiddenPackageDifferentPluginModuleNameObscured(self):
648 Plugins from the first package in sys.path should be returned by
649 getPlugins in the case where there are two Python packages by the same
650 name installed, each with a plugin module by a different name.
652 root = FilePath(self.mktemp())
655 firstDirectory = self.createDummyPackage(root, 'first', 'thisplugin')
656 secondDirectory = self.createDummyPackage(root, 'second', 'thatplugin')
658 sys.path.append(firstDirectory.path)
659 sys.path.append(secondDirectory.path)
663 plugins = list(plugin.getPlugins(ITestPlugin, dummy.plugins))
664 self.assertEqual(['first'], [p.__name__ for p in plugins])
668 class PackagePathTests(unittest.TestCase):
670 Tests for L{plugin.pluginPackagePaths} which constructs search paths for
676 Save the elements of C{sys.path}.
678 self.originalPath = sys.path[:]
683 Restore C{sys.path} to its original value.
685 sys.path[:] = self.originalPath
688 def test_pluginDirectories(self):
690 L{plugin.pluginPackagePaths} should return a list containing each
691 directory in C{sys.path} with a suffix based on the supplied package
694 foo = FilePath('foo')
695 bar = FilePath('bar')
696 sys.path = [foo.path, bar.path]
698 plugin.pluginPackagePaths('dummy.plugins'),
699 [foo.child('dummy').child('plugins').path,
700 bar.child('dummy').child('plugins').path])
703 def test_pluginPackagesExcluded(self):
705 L{plugin.pluginPackagePaths} should exclude directories which are
706 Python packages. The only allowed plugin package (the only one
707 associated with a I{dummy} package which Python will allow to be
708 imported) will already be known to the caller of
709 L{plugin.pluginPackagePaths} and will most commonly already be in
710 the C{__path__} they are about to mutate.
712 root = FilePath(self.mktemp())
713 foo = root.child('foo').child('dummy').child('plugins')
715 foo.child('__init__.py').setContent('')
716 sys.path = [root.child('foo').path, root.child('bar').path]
718 plugin.pluginPackagePaths('dummy.plugins'),
719 [root.child('bar').child('dummy').child('plugins').path])