Add integration test suite
authorMartin Pitt <martin.pitt@ubuntu.com>
Sat, 5 Nov 2011 19:57:26 +0000 (15:57 -0400)
committerMartin Pitt <martin.pitt@ubuntu.com>
Sat, 5 Nov 2011 19:57:26 +0000 (15:57 -0400)
This is the beginning of porting udisks1's "tests/run" integration test suite.
When running the suite from the source tree, this tests the daemon and library
from source tree; falls back on the installed system ones otherwise.

Currently covered are:

 - Creation and label handling on all supported file systems (including swap
   and empty)
 - cleanup after drive removal without unmount
 - UDisksDrive properties
 - picks up changes from command line tools like mkfs
 - correct file/dir/mountpoint permissions
 - SMART

src/tests/integration-test [new file with mode: 0755]

diff --git a/src/tests/integration-test b/src/tests/integration-test
new file mode 100755 (executable)
index 0000000..f8f2029
--- /dev/null
@@ -0,0 +1,882 @@
+#!/usr/bin/python3
+#
+# udisks2 integration test suite
+#
+# Run in udisks built tree to test local built binaries (needs
+# --localstatedir=/var), or from anywhere else to test system installed
+# binaries. 
+#
+# Usage:
+# - Run all tests: 
+#   src/tests/integration-test
+# - Run only a particular class of tests:
+#   src/tests/integration-test Drive
+# - Run only a single test:
+#   src/tests/integration-test FS.test_ext3
+#
+# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+import sys
+import os
+
+srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
+libdir = os.path.join(srcdir, 'udisks', '.libs')
+
+# as we can't change LD_LIBRARY_PATH within a running program, and doing
+# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
+# nasty hack
+if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
+    os.environ['LD_LIBRARY_PATH'] = libdir
+    os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
+            srcdir,
+            os.environ.get('GI_TYPELIB_PATH', ''))
+    os.execv(sys.argv[0], sys.argv)
+    assert False, 'not expecting to land here'
+
+import subprocess
+import unittest
+import tempfile
+import atexit
+import time
+import shutil
+import signal
+import argparse
+import re
+from glob import glob
+from gi.repository import GLib, UDisks
+
+#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
+VDEV_SIZE = 300000000 # size of virtual test device
+
+# Those file systems are known to have a broken handling of permissions, in
+# particular the executable bit
+BROKEN_PERMISSIONS_FS = ['ntfs']
+
+# Some D-BUS API methods cause properties to not be up to date yet when a
+# method call finishes, thus we do an udevadm settle as a workaround. Those
+# methods should eventually get fixed properly, but it's unnerving to have
+# the tests fail on them when you are working on something else. This flag
+# gets set by the --no-workarounds option to disable those syncs, so that these
+# race conditions can be fixed.
+workaround_syncs = False
+
+
+# ----------------------------------------------------------------------------
+
+class UDisksTestCase(unittest.TestCase):
+    '''Base class for udisks test cases.
+    
+    This provides static functions which are useful for all test cases.
+    '''
+    tool_path = None
+    daemon = None
+    daemon_log = None
+    device = None
+
+    client = None
+    manager = None
+
+    @classmethod
+    def init(klass, logfile=None):
+        '''start daemon and set up test environment'''
+
+        if os.geteuid() != 0:
+            print('this test suite needs to run as root', file=sys.stderr)
+            sys.exit(0)
+
+        # run from local build tree if we are in one, otherwise use system instance
+        daemon_path = os.path.join(srcdir, 'src', 'udisksd')
+        if (os.access (daemon_path, os.X_OK)):
+            klass.tool_path = 'tools/udisksctl'
+            print('Testing binaries from local build tree')
+            klass.check_build_tree_config()
+        else:
+            print('Testing installed system binaries')
+            daemon_path = None
+            for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
+                if l.startswith('Exec='):
+                    daemon_path = l.split('=', 1)[1].strip()
+                    break
+            assert daemon_path, 'could not determine daemon path from D-BUS .service file'
+
+            klass.tool_path = 'udisksctl'
+
+        print('daemon path: ' + daemon_path)
+
+        klass.device = klass.setup_vdev()
+
+        # inhibit GNOME automounting/nautilus pop ups
+        subprocess.call(['killall', '-STOP', 'gvfs-gdu-volume-monitor'])
+
+        # start daemon
+        if logfile:
+            klass.daemon_log = open(logfile, 'w')
+        else:
+            klass.daemon_log = tempfile.TemporaryFile()
+        klass.daemon = subprocess.Popen([daemon_path, '--replace'],
+            stdout=klass.daemon_log, stderr=subprocess.STDOUT)
+        assert klass.daemon.pid, 'daemon failed to start'
+
+        atexit.register(klass.cleanup)
+
+        # wait until the daemon has started up
+        timeout = 10
+        while klass.manager is None and timeout > 0:
+            time.sleep(0.2)
+            klass.client = UDisks.Client.new_sync(None)
+            assert klass.client != None
+            klass.manager = klass.client.get_manager()
+            timeout -= 1
+        assert klass.manager, 'daemon failed to start'
+
+        klass.sync()
+
+    @classmethod
+    def cleanup(klass):
+        '''stop daemon again and clean up test environment'''
+
+        subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
+
+        os.kill(klass.daemon.pid, signal.SIGTERM)
+        os.wait()
+        klass.daemon = None
+
+        klass.teardown_vdev(klass.device)
+        klass.device = None
+
+        # resume GNOME automounting/nautilus pop ups
+        subprocess.call(['killall', '-CONT', 'gvfs-gdu-volume-monitor'])
+
+    @classmethod
+    def sync(klass):
+        '''Wait until pending events finished processing.
+        
+        This should only be called for situations where we genuinely have an
+        asynchronous response, like invoking a CLI program and waiting for
+        udev/udisks to catch up on the change events.
+        '''
+        subprocess.call(['udevadm', 'settle'])
+        klass.client.settle()
+
+    @classmethod
+    def sync_workaround(klass):
+        '''Wait until pending events finished processing (bug workaround).
+        
+        This should be called for race conditions in the D-BUS API which cause
+        properties to not be up to date yet when a method call finishes. Those
+        should eventually get fixed properly, but it's unnerving to have the
+        tests fail on them when you are working on something else.
+
+        This sync is not done if running with --no-workarounds.
+        '''
+        if workaround_syncs:
+            klass.sync()
+
+    @classmethod
+    def zero_device(klass):
+        subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
+                stderr=subprocess.PIPE)
+        klass.sync()
+
+    @classmethod
+    def devname(klass, partition=None):
+        '''Get name of test device or one of its partitions'''
+
+        if partition:
+            if klass.device[-1].isdigit():
+                return klass.device + 'p' + str(partition)
+            else:
+                return klass.device + str(partition)
+        else:
+            return klass.device
+
+    @classmethod
+    def udisks_block(klass, partition=None):
+        '''Get UDisksBlock object for test device or partition'''
+
+        assert klass.client
+        devname = klass.devname(partition)
+        dev_t = os.stat(devname).st_rdev
+        block = klass.client.get_block_for_dev(dev_t)
+        assert block, 'did not find an UDisksBlock object for %s' % devname
+        return block
+
+    @classmethod
+    def udisks_filesystem(klass, partition=None):
+        '''Get UDisksFilesystem object for test device or partition
+        
+        Return None if there is no file system on that device.
+        '''
+        block = klass.udisks_block(partition)
+        return klass.client.get_object(block.get_object_path()).get_property('filesystem')
+
+    @classmethod
+    def blkid(klass, partition=None):
+        '''Call blkid and return dictionary of results.'''
+
+        result = {}
+        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', 
+            klass.devname(partition)], stdout=subprocess.PIPE)
+        for l in cmd.stdout:
+            (key, value) = l.decode('UTF-8').split('=', 1)
+            result[key] = value.strip()
+        assert cmd.wait() == 0
+        return result
+
+    @classmethod
+    def is_mountpoint(klass, path):
+        '''Check if given path is a mount point.'''
+
+        return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
+
+    @classmethod
+    def mkfs(klass, type, label=None, partition=None):
+        '''Create file system using mkfs.'''
+
+        if type == 'minix':
+            assert label is None, 'minix does not support labels'
+
+        # work around mkswap not properly cleaning up an existing reiserfs
+        # signature (mailed kzak about it)
+        if type == 'swap':
+            subprocess.check_call(['wipefs', '-a', klass.devname(partition)])
+
+        mkcmd =     { 'swap': 'mkswap',
+                    }
+        label_opt = { 'vfat': '-n', 
+                      'reiserfs': '-l',
+                    }
+        extra_opt = { 'vfat': [ '-I', '-F', '32'],
+                      'swap': ['-f'],
+                      'xfs': ['-f'], # XFS complains if there's an existing FS, so force
+                      'ext2': ['-F'], # ext* complains about using entire device, so force
+                      'ext3': ['-F'],
+                      'ext4': ['-F'],
+                      'ntfs': ['-F'],
+                      'reiserfs': ['-ff'],
+                    }
+
+        cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
+        if label:
+            cmd += [label_opt.get(type, '-L'), label]
+        cmd.append(klass.devname(partition))
+
+        subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+        # kernel/udev generally detect those changes itself, but do not quite
+        # tell us when they are done; so do a little kludge here to know how
+        # long we need to wait
+        subprocess.call(['udevadm', 'trigger', '--action=change',
+            '--sysname-match=' + os.path.basename(klass.devname(partition))])
+        klass.sync()
+
+    @classmethod
+    def fs_create(klass, partition, type, options):
+        '''Create file system using udisks.'''
+
+        block = klass.udisks_block(partition)
+        block.call_format_sync(type, options, None)
+        klass.sync_workaround()
+
+    @classmethod
+    def retry_busy(klass, fn, *args):
+        '''Call a function until it does not fail with "Busy".'''
+
+        timeout = 10
+        while timeout >= 0:
+            try:
+                return fn(*args)
+            except GLib.GError as e:
+                if not 'UDisks.Error.DeviceBusy' in e.message:
+                    raise
+                sys.stderr.write('[busy] ')
+                time.sleep(0.3)
+                timeout -= 1
+
+    @classmethod
+    def check_build_tree_config(klass):
+        '''Check configuration of build tree'''
+
+        # read make variables
+        make_vars = {}
+        var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
+        make = subprocess.Popen(['make', '-p', '/dev/null'],
+                stdout=subprocess.PIPE)
+        for l in make.stdout:
+            l = l.decode('UTF-8')
+            m = var_re.match(l)
+            if m:
+                make_vars[m.group(1)] = m.group(2)
+        make.wait()
+
+        # expand make variables
+        subst_re = re.compile('\${([a-zA-Z_]+)}')
+        for (k, v) in make_vars.items():
+            while True:
+                m = subst_re.search(v)
+                if m:
+                    v = subst_re.sub(make_vars.get(m.group(1), ''), v)
+                    make_vars[k] = v
+                else:
+                    break
+
+        # check localstatedir
+        for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks'),
+                os.path.join(make_vars['localstatedir'], 'lib', 'udisks')):
+            if not os.path.exists(d):
+                sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
+                sys.exit(0)
+        
+    @classmethod
+    def setup_vdev(klass):
+        '''create virtual test device
+        
+        It is zeroed out initially.
+
+        Return the device path.
+        '''
+        # ensure that the scsi_debug module is loaded
+        if os.path.isdir('/sys/module/scsi_debug'):
+            sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
+            sys.exit(1)
+
+        assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
+            VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
+
+        # wait until all drives are created
+        dirs = []
+        while len(dirs) < 1:
+            dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+            time.sleep(0.1)
+        assert len(dirs) == 1
+
+        # determine the debug block devices
+        devs = os.listdir(dirs[0])
+        assert len(devs) == 1
+        dev = '/dev/' + devs[0]
+        assert os.path.exists(dev)
+
+        # let's be 100% sure that we pick a virtual one
+        assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
+
+        print('Set up test device: ' + dev)
+        return dev
+
+    @classmethod
+    def teardown_vdev(klass, device):
+        '''release and remove virtual test device'''
+
+        klass.remove_device(device)
+        assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
+                'Failure to rmmod scsi_debug'
+
+    @classmethod
+    def remove_device(klass, device):
+        '''remove virtual test device'''
+
+        device = device.split('/')[-1]
+        if os.path.exists('/sys/block/' + device):
+            f = open('/sys/block/%s/device/delete' % device, 'w')
+            f.write('1')
+            f.close()
+        while os.path.exists(device):
+            time.sleep(0.1)
+        klass.sync()
+        time.sleep(0.5) # TODO
+
+    @classmethod
+    def readd_devices(klass):
+        '''re-add virtual test devices after removal'''
+
+        scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
+        assert len(scan_files) > 0
+        for f in scan_files:
+            open(f, 'w').write('- - -\n')
+        while not os.path.exists(klass.device):
+            time.sleep(0.1)
+        time.sleep(0.5)
+        klass.sync()
+
+# ----------------------------------------------------------------------------
+
+class Manager(UDisksTestCase):
+    '''UDisksManager operations'''
+
+    def test_version(self):
+        '''daemon version'''
+
+        self.assertTrue(self.manager.get_property('version')[0].isdigit())
+
+# ----------------------------------------------------------------------------
+
+class Drive(UDisksTestCase):
+    '''UDisksDrive'''
+
+    def setUp(self):
+        self.drive = self.client.get_drive_for_block(self.udisks_block())
+        self.assertNotEqual(self.drive, None)
+
+    def test_properties(self):
+        '''properties of UDisksDrive object'''
+
+        self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
+        self.assertEqual(self.drive.get_property('vendor'), 'Linux')
+        self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
+        self.assertEqual(self.drive.get_property('media-available'), True)
+        self.assertEqual(self.drive.get_property('optical'), False)
+
+        self.assertNotEqual(len(self.drive.get_property('serial')), 0)
+        self.assertNotEqual(len(self.drive.get_property('revision')), 0)
+
+# ----------------------------------------------------------------------------
+
+class FS(UDisksTestCase):
+    '''Test detection of all supported file systems'''
+
+    def setUp(self):
+        self.workdir = tempfile.mkdtemp()
+        self.block = self.udisks_block()
+        self.assertNotEqual(self.block, None)
+
+    def tearDown(self):
+        if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
+            sys.stderr.write('[cleanup unmount] ')
+        shutil.rmtree (self.workdir)
+
+    def test_zero(self):
+        '''properties of zeroed out device'''
+
+        self.zero_device()
+        self.assertEqual(self.block.get_property('device'), self.device)
+        self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
+        self.assertEqual(self.block.get_property('hint-system'), True)
+        self.assertEqual(self.block.get_property('id-label'), '')
+        self.assertEqual(self.block.get_property('id-usage'), '')
+        self.assertEqual(self.block.get_property('id-type'), '')
+        self.assertEqual(self.block.get_property('id-uuid'), '')
+        self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
+        obj = self.client.get_object(self.block.get_object_path())
+        self.assertEqual(obj.get_property('filesystem'), None)
+        self.assertEqual(obj.get_property('partition'), None)
+        self.assertEqual(obj.get_property('partition-table'), None)
+
+    def test_ext2(self):
+        '''fs: ext2'''
+        self._do_fs_check('ext2')
+
+    def test_ext3(self):
+        '''fs: ext3'''
+        self._do_fs_check('ext3')
+
+    def test_ext4(self):
+        '''fs: ext4'''
+        self._do_fs_check('ext4')
+
+    def test_btrfs(self):
+        '''fs: btrfs'''
+        self._do_fs_check('btrfs')
+
+    def test_minix(self):
+        '''fs: minix'''
+        self._do_fs_check('minix')
+
+    def test_xfs(self):
+        '''fs: XFS'''
+        self._do_fs_check('xfs')
+
+    def test_ntfs(self):
+        '''fs: NTFS'''
+        self._do_fs_check('ntfs')
+
+    def test_vfat(self):
+        '''fs: FAT'''
+        self._do_fs_check('vfat')
+
+    def test_reiserfs(self):
+        '''fs: reiserfs'''
+        self._do_fs_check('reiserfs')
+
+    def test_swap(self):
+        '''fs: swap'''
+        self._do_fs_check('swap')
+
+    def test_nilfs2(self):
+        '''fs: nilfs2'''
+        self._do_fs_check('nilfs2')
+
+    def test_empty(self):
+        '''fs: empty'''
+
+        self.mkfs('ext4', 'foo')
+        block = self.udisks_block()
+        self.assertEqual(block.get_property('id-usage'), 'filesystem')
+        self.assertEqual(block.get_property('id-type'), 'ext4')
+        self.assertEqual(block.get_property('id-label'), 'foo')
+        self.assertNotEqual(self.udisks_filesystem(), None)
+
+        self.fs_create(None, 'empty', GLib.Variant('a{sv}', {}))
+
+        self.assertEqual(block.get_property('id-usage'), '')
+        self.assertEqual(block.get_property('id-type'), '')
+        self.assertEqual(block.get_property('id-label'), '')
+        self.assertEqual(self.udisks_filesystem(), None)
+
+    def test_create_fs_unknown_type(self):
+        '''Format() with unknown type'''
+
+        try:
+            self.fs_create(None, 'bogus', GLib.Variant('a{sv}', {}))
+            self.fail('Expected failure for bogus file system')
+        except GLib.GError as e:
+            self.assertTrue('UDisks.Error.NotSupported' in e.message)
+            self.assertTrue('type bogus' in e.message)
+
+    def test_create_fs_unsupported_label(self):
+        '''Format() with unsupported label'''
+
+        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
+        try:
+            self.fs_create(None, 'minix', options)
+            self.fail('Expected failure for unsupported label')
+        except GLib.GError as e:
+            self.assertTrue('UDisks.Error.NotSupported' in e.message)
+
+    def test_force_removal(self):
+        '''fs: forced removal'''
+
+        # create a fs and mount it
+        self.mkfs('ext4', 'udiskstest')
+        fs = self.udisks_filesystem()
+        mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+        self.assertEqual(mount_path, '/media/udiskstest')
+        self.assertTrue(self.is_mountpoint('/media/udiskstest'))
+
+        dev_t = os.stat(self.devname()).st_rdev
+
+        # removal should clean up mounts
+        self.remove_device(self.device)
+        self.assertFalse(os.path.exists(mount_path))
+        self.assertEqual(self.client.get_block_for_dev(dev_t), None)
+
+        # after putting it back, it should be mountable again
+        self.readd_devices()
+        fs = self.udisks_filesystem()
+        self.assertEqual(fs.get_property('mount-points'), [])
+
+        mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+        self.assertTrue(self.is_mountpoint('/media/udiskstest'))
+        self.assertEqual(mount_path, '/media/udiskstest')
+        self.client.settle()
+        self.assertEqual(fs.get_property('mount-points'), ['/media/udiskstest'])
+
+        self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None)
+        self.client.settle()
+        self.assertEqual(fs.get_property('mount-points'), [])
+
+    def _do_fs_check(self, type):
+        '''Run checks for a particular file system.'''
+
+        if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
+                stdout=subprocess.PIPE) != 0:
+            sys.stderr.write('[no mkfs.%s, skip] ' % type)
+
+            # check correct D-Bus exception
+            try:
+                self.fs_create(None, type, GLib.Variant('a{sv}', {}))
+                self.fail('Expected failure for missing mkfs.' + type)
+            except GLib.GError as e:
+                self.assertTrue('UDisks.Error.Failed' in e.message)
+            return
+
+        # do checks with command line tools (mkfs/mount/umount)
+        sys.stderr.write('[cli] ')
+        sys.stderr.flush()
+
+        self._do_cli_check(type)
+        if type != 'minix':
+            self._do_cli_check(type, 'test%stst' % type)
+
+        # put a different fs here instead of zeroing, so that we verify that
+        # udisks overrides existing FS (e. g. XFS complains then), and does not
+        # leave traces of other FS around
+        if type == 'ext3':
+            self.mkfs('swap')
+        else:
+            self.mkfs('ext3')
+
+        # do checks with udisks operations
+        sys.stderr.write('[ud] ')
+        self._do_udisks_check(type)
+        if type != 'minix':
+            self._do_udisks_check(type, 'test%stst' % type)
+            # also test fs_create with an empty label
+            self._do_udisks_check(type, '')
+
+    def _do_cli_check(self, type, label=None):
+        '''udisks correctly picks up file system changes from command line tools'''
+
+        self.mkfs(type, label)
+
+        block = self.udisks_block()
+
+        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
+
+        self.assertEqual(block.get_property('id-type'), type)
+        self.assertEqual(block.get_property('id-label'), label or '')
+        self.assertEqual(block.get_property('hint-name'), '')
+        if type != 'minix':
+            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
+
+        obj = self.client.get_object(self.block.get_object_path())
+        self.assertEqual(obj.get_property('partition'), None)
+        self.assertEqual(obj.get_property('partition-table'), None)
+
+        fs = obj.get_property('filesystem')
+        if type == 'swap':
+            self.assertEqual(fs, None)
+        else:
+            self.assertNotEqual(fs, None)
+
+        if type == 'swap':
+            return
+
+        # mount it
+        if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
+                stdout=subprocess.PIPE) == 0:
+            # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
+            # defaults to ntfs-3g if installed); TODO: check other distros
+            mount_prog = 'mount.ntfs-3g'
+        else:
+            mount_prog = 'mount'
+        ret = subprocess.call([mount_prog, self.device, self.workdir])
+        if ret == 32:
+            # missing fs driver
+            sys.stderr.write('[missing kernel driver, skip] ')
+            return
+        self.assertEqual(ret, 0)
+
+        self.sync()
+        self.assertEqual(fs.get_property('mount-points'), [self.workdir])
+
+        # unmount it
+        subprocess.call(['umount', self.workdir])
+        self.sync()
+        self.assertEqual(fs.get_property('mount-points'), [])
+
+    def _do_udisks_check(self, type, label=None):
+        '''udisks API correctly changes file system'''
+
+        # create fs 
+        if label is not None:
+            options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
+        else:
+            options = GLib.Variant('a{sv}', {})
+        self.fs_create(None, type, options)
+
+        # properties
+        id = self.blkid()
+        self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
+        self.assertEqual(id['ID_FS_TYPE'], type)
+        self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
+
+        block = self.udisks_block()
+        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
+        self.assertEqual(block.get_property('id-type'), type)
+        self.assertEqual(block.get_property('id-label'), label or '')
+
+        if type == 'swap':
+            return
+
+        obj = self.client.get_object(self.block.get_object_path())
+        self.assertEqual(obj.get_property('partition'), None)
+        self.assertEqual(obj.get_property('partition-table'), None)
+
+        fs = self.udisks_filesystem()
+        self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
+        self.assertEqual(fs.get_property('mount-points'), [])
+
+        # mount
+        mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+
+        if label:
+            self.assertEqual(mount_path, '/media/' + label)
+        else:
+            self.assertTrue(mount_path.startswith('/media/'))
+
+        self.client.settle()
+        self.assertEqual(fs.get_property('mount-points'), [mount_path])
+        self.assertTrue(self.is_mountpoint(mount_path))
+
+        # no ownership taken, should be root owned
+        st = os.stat(mount_path)
+        self.assertEqual((st.st_uid, st.st_gid), (0, 0))
+
+        self._do_file_perms_checks(type, mount_path)
+
+        # unmount
+        self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None)
+        self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+        self.assertEqual(fs.get_property('mount-points'), [mount_path])
+
+        # create fs with taking ownership (daemon:mail == 1:8)
+        #if supports_unix_owners:
+        #    options.append('take_ownership_uid=1')
+        #    options.append('take_ownership_gid=8')
+        #    self.fs_create(None, type, options)
+        #    mount_path = iface.FilesystemMount('', [])
+        #    st = os.stat(mount_path)
+        #    self.assertEqual((st.st_uid, st.st_gid), (1, 8))
+        #    self.retry_busy(self.partition_iface().FilesystemUnmount, [])
+        #    self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+
+        # change label
+        supported = True
+        l = 'n"a\m\\"e' + type
+        if type == 'vfat':
+            # VFAT does not support some characters
+            self.assertRaises(GLib.GError, fs.call_set_label_sync, l, 
+                    GLib.Variant('a{sv}', {}), None)
+            l = "n@a$me"
+        try:
+            fs.call_set_label_sync(l, GLib.Variant('a{sv}', {}), None)
+        except GLib.GError as e:
+            if 'UDisks.Error.NotSupported' in e.message:
+                # these fses are known to not support relabeling
+                self.assertTrue(type in ['minix', 'btrfs'])
+                supported = False
+
+        if supported:
+            block = self.udisks_block()
+            blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
+                    '\\x5c', '\\').replace('\\x24', '$')
+            self.sync_workaround()
+            if type == 'vfat':
+                # EXFAIL: often (but not always) the label appears in all upper case
+                self.assertEqual(blkid_label.upper(), l.upper())
+                self.assertEqual(block.get_property('id-label').upper(), l.upper())
+            else:
+                self.assertEqual(blkid_label, l)
+                self.assertEqual(block.get_property('id-label'), l)
+
+            # test setting empty label
+            fs.call_set_label_sync('', GLib.Variant('a{sv}', {}), None)
+            self.sync_workaround()
+            self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
+            self.assertEqual(block.get_property('id-label'), '')
+
+        # check fs - Not implemented in udisks yet
+        #self.assertEqual(iface.FilesystemCheck([]), True)
+
+    def _do_file_perms_checks(self, type, mount_point):
+        '''Check for permissions for data files and executables.
+
+        This particularly checks sane and useful permissions on non-Unix file
+        systems like vfat.
+        '''
+        if type in BROKEN_PERMISSIONS_FS:
+            return
+
+        f = os.path.join(mount_point, 'simpledata.txt')
+        open(f, 'w').close()
+        self.assertTrue(os.access(f, os.R_OK))
+        self.assertTrue(os.access(f, os.W_OK))
+        self.assertFalse(os.access(f, os.X_OK))
+
+        f = os.path.join(mount_point, 'simple.exe')
+        shutil.copy('/bin/bash', f)
+        self.assertTrue(os.access(f, os.R_OK))
+        self.assertTrue(os.access(f, os.W_OK))
+        self.assertTrue(os.access(f, os.X_OK))
+
+        os.mkdir(os.path.join(mount_point, 'subdir'))
+        f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
+        open(f, 'w').close()
+        self.assertTrue(os.access(f, os.R_OK))
+        self.assertTrue(os.access(f, os.W_OK))
+        self.assertFalse(os.access(f, os.X_OK))
+
+        f = os.path.join(mount_point, 'subdir', 'subdir.exe')
+        shutil.copy('/bin/bash', f)
+        self.assertTrue(os.access(f, os.R_OK))
+        self.assertTrue(os.access(f, os.W_OK))
+        self.assertTrue(os.access(f, os.X_OK))
+
+## ----------------------------------------------------------------------------
+
+class Smart(UDisksTestCase):
+    '''Check SMART operation.'''
+
+    def test_sda(self):
+        '''SMART status of first internal hard disk
+        
+        This is a best-effort readonly test.
+        '''
+        hd = '/dev/sda'
+
+        if not os.path.exists(hd):
+            sys.stderr.write('[skip] ')
+            return
+
+        has_smart = subprocess.call(['skdump', '--can-smart', hd],
+                stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
+
+        block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
+        self.assertNotEqual(block, None)
+        drive = self.client.get_drive_for_block(block)
+        ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
+        self.assertEqual(ata != None, has_smart)
+
+        if has_smart:
+            sys.stderr.write('[avail] ')
+            self.assertEqual(ata.get_property('smart-supported'), True)
+            self.assertEqual(ata.get_property('smart-enabled'), True)
+
+            # wait for SMART data to be read
+            while ata.get_property('smart-updated') == 0:
+                sys.stderr.write('[wait for data] ')
+                time.sleep(0.5)
+
+            # this is of course not truly correct for a test suite, but let's
+            # consider it a courtesy for developers :-)
+            self.assertEqual(ata.get_property('smart-failing'), False)
+            self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
+        else:
+            sys.stderr.write('[N/A] ')
+
+
+# ----------------------------------------------------------------------------
+
+if __name__ == '__main__':
+    argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
+    argparser.add_argument('-l', '--log-file', dest='logfile',
+            help='write daemon log to a file')
+    argparser.add_argument('-w', '--no-workarounds',
+            action="store_true", default=False,
+            help='Disable workarounds for race conditions in the D-BUS API')
+    argparser.add_argument('testname', nargs='*',
+            help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
+    args = argparser.parse_args()
+
+    workaround_syncs = not args.no_workarounds
+
+    UDisksTestCase.init(logfile=args.logfile)
+    if args.testname:
+        tests = unittest.TestLoader().loadTestsFromNames(args.testname,
+                __import__('__main__'))
+    else:
+        tests = unittest.TestLoader().loadTestsFromName('__main__')
+    if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():
+        sys.exit(0)
+    else:
+        sys.exit(1)
+