integration tests: Test LUKS
authorMartin Pitt <martin.pitt@ubuntu.com>
Sat, 5 Nov 2011 23:22:12 +0000 (19:22 -0400)
committerMartin Pitt <martin.pitt@ubuntu.com>
Sat, 5 Nov 2011 23:22:12 +0000 (19:22 -0400)
src/tests/integration-test

index f8f2029..135d856 100755 (executable)
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 # GNU General Public License for more details.
 
+# TODO:
+# - add and test method for changing LUKS passphrase
+# - test Format with take-ownership
+
 import sys
 import os
 
@@ -221,12 +225,13 @@ class UDisksTestCase(unittest.TestCase):
         return klass.client.get_object(block.get_object_path()).get_property('filesystem')
 
     @classmethod
-    def blkid(klass, partition=None):
+    def blkid(klass, partition=None, device=None):
         '''Call blkid and return dictionary of results.'''
 
+        if not device:
+            device = klass.devname(partition)
         result = {}
-        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', 
-            klass.devname(partition)], stdout=subprocess.PIPE)
+        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
         for l in cmd.stdout:
             (key, value) = l.decode('UTF-8').split('=', 1)
             result[key] = value.strip()
@@ -754,6 +759,8 @@ class FS(UDisksTestCase):
                 # these fses are known to not support relabeling
                 self.assertTrue(type in ['minix', 'btrfs'])
                 supported = False
+            else:
+                raise
 
         if supported:
             block = self.udisks_block()
@@ -856,6 +863,167 @@ class Smart(UDisksTestCase):
 
 # ----------------------------------------------------------------------------
 
+class Luks(UDisksTestCase):
+    '''Check LUKS.'''
+
+    def tearDown(self):
+        '''clean up behind failed test cases'''
+
+        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
+        if crypt_obj:
+            encrypted = crypt_obj.get_property('encrypted')
+            if encrypted:
+                try:
+                    encrypted.call_lock_sync(GLib.Variant('a{sv}', {}), None)
+                    sys.stderr.write('[cleanup lock] ')
+                except GLib.GError:
+                    pass
+
+    # needs to run before the other tests
+    def test_0_create_teardown(self):
+        '''LUKS create/teardown'''
+
+        self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
+            'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
+            'label': GLib.Variant('s', 'treasure'),
+            }))
+
+        try:
+            block = self.udisks_block()
+            obj = self.client.get_object(block.get_object_path())
+            self.assertEqual(obj.get_property('filesystem'), None)
+            encrypted = obj.get_property('encrypted')
+            self.assertNotEqual(encrypted, None)
+
+            # check crypted device info
+            self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
+            self.assertEqual(block.get_property('id-usage'), 'crypto')
+            self.assertEqual(block.get_property('id-label'), '')
+            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
+            self.assertEqual(block.get_property('device'), self.devname())
+
+            # check whether we can lock/unlock; we also need this to get the
+            # cleartext device
+            encrypted.call_lock_sync(GLib.Variant('a{sv}', {}), None)
+            self.assertRaises(GLib.GError, encrypted.call_lock_sync, 
+                    GLib.Variant('a{sv}', {}), None)
+            
+            # wrong password
+            self.assertRaises(GLib.GError, encrypted.call_unlock_sync, 
+                    'h4ckpassword', GLib.Variant('a{sv}', {}), None)
+            # right password
+            clear_path = encrypted.call_unlock_sync('s3kr1t', 
+                    GLib.Variant('a{sv}', {}), None)
+
+            # check cleartext device info
+            clear_obj = self.client.get_object(clear_path)
+            self.assertEqual(clear_obj.get_property('encrypted'), None)
+            clear_block = clear_obj.get_property('block')
+            self.assertEqual(clear_block.get_property('id-type'), 'ext4')
+            self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
+            self.assertEqual(clear_block.get_property('id-label'), 'treasure')
+            self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
+            clear_dev = clear_block.get_property('device')
+            self.assertNotEqual(clear_dev, None)
+            self.assertEqual(clear_block.get_property('id-uuid'),
+                    self.blkid(device=clear_dev)['ID_FS_UUID'])
+
+            clear_fs = clear_obj.get_property('filesystem')
+            self.assertEqual(clear_fs.get_property('mount-points'), [])
+
+            # check that we do not leak key information
+            udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
+                    stdout=subprocess.PIPE)
+            out = udev_dump.communicate()[0]
+            self.assertFalse(b's3kr1t' in out, 'password in udev properties')
+            self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
+
+        finally:
+            # tear down cleartext device
+            encrypted.call_lock_sync(GLib.Variant('a{sv}', {}), None)
+            self.assertFalse(os.path.exists(clear_dev))
+
+    def test_luks_mount(self):
+        '''LUKS mount/unmount'''
+
+        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
+        encrypted = crypt_obj.get_property('encrypted')
+
+        path = encrypted.call_unlock_sync('s3kr1t', 
+                GLib.Variant('a{sv}', {}), None)
+        self.client.settle()
+        obj = self.client.get_object(path)
+        fs = obj.get_property('filesystem')
+        self.assertNotEqual(fs, None)
+
+        # mount
+        mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+
+        try:
+            self.assertEqual(mount_path, '/media/treasure')
+            self.assertTrue(self.is_mountpoint(mount_path))
+            self.client.settle()
+            self.assertEqual(fs.get_property('mount-points'), ['/media/treasure'])
+
+            # can't lock, busy
+            try:
+                encrypted.call_lock_sync(GLib.Variant('a{sv}', {}), None)
+                self.fail('Lock() unexpectedly succeeded on mounted file system')
+            except GLib.GError as e:
+                self.assertTrue('UDisks.Error.Failed' in e.message)
+        finally:
+            # umount
+            self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None)
+            self.client.settle()
+            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+            self.assertEqual(fs.get_property('mount-points'), [])
+
+            # lock
+            encrypted.call_lock_sync(GLib.Variant('a{sv}', {}), None)
+            self.client.settle()
+            self.assertEqual(self.client.get_object(path), None)
+
+    def test_luks_forced_removal(self):
+        '''LUKS forced removal'''
+
+        # unlock and mount it
+        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
+        path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
+                GLib.Variant('a{sv}', {}), None)
+        try:
+            fs = self.client.get_object(path).get_property('filesystem')
+            mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+            self.assertEqual(mount_path, '/media/treasure')
+
+            # removal should clean up mounts
+            self.remove_device(self.device)
+            self.assertFalse(os.path.exists(mount_path))
+            self.assertEqual(self.client.get_object(path), None)
+
+            # after putting it back, it should be mountable again
+            self.readd_devices()
+            crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
+            path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
+                    GLib.Variant('a{sv}', {}), None)
+            self.client.settle()
+            fs = self.client.get_object(path).get_property('filesystem')
+            mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+            self.assertEqual(mount_path, '/media/treasure')
+
+            # umount
+            self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None)
+            self.client.settle()
+            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+            self.assertEqual(fs.get_property('mount-points'), [])
+        finally:
+            # lock
+            crypt_obj.get_property('encrypted').call_lock_sync(
+                    GLib.Variant('a{sv}', {}), None)
+            self.client.settle()
+            self.assertEqual(self.client.get_object(path), None)
+
+# ----------------------------------------------------------------------------
+
 if __name__ == '__main__':
     argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
     argparser.add_argument('-l', '--log-file', dest='logfile',