From fa893b3bc66b642706f399e3f61177076430c566 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Sat, 5 Nov 2011 15:57:26 -0400 Subject: [PATCH] Add integration test suite This is the beginning of porting udisks1's "tests/run" integration test suite. When running the suite from the source tree, this tests the daemon and library from source tree; falls back on the installed system ones otherwise. Currently covered are: - Creation and label handling on all supported file systems (including swap and empty) - cleanup after drive removal without unmount - UDisksDrive properties - picks up changes from command line tools like mkfs - correct file/dir/mountpoint permissions - SMART --- src/tests/integration-test | 882 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 882 insertions(+) create mode 100755 src/tests/integration-test diff --git a/src/tests/integration-test b/src/tests/integration-test new file mode 100755 index 0000000..f8f2029 --- /dev/null +++ b/src/tests/integration-test @@ -0,0 +1,882 @@ +#!/usr/bin/python3 +# +# udisks2 integration test suite +# +# Run in udisks built tree to test local built binaries (needs +# --localstatedir=/var), or from anywhere else to test system installed +# binaries. +# +# Usage: +# - Run all tests: +# src/tests/integration-test +# - Run only a particular class of tests: +# src/tests/integration-test Drive +# - Run only a single test: +# src/tests/integration-test FS.test_ext3 +# +# Copyright: (C) 2011 Martin Pitt +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import sys +import os + +srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) +libdir = os.path.join(srcdir, 'udisks', '.libs') + +# as we can't change LD_LIBRARY_PATH within a running program, and doing +# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this +# nasty hack +if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir): + os.environ['LD_LIBRARY_PATH'] = libdir + os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % ( + srcdir, + os.environ.get('GI_TYPELIB_PATH', '')) + os.execv(sys.argv[0], sys.argv) + assert False, 'not expecting to land here' + +import subprocess +import unittest +import tempfile +import atexit +import time +import shutil +import signal +import argparse +import re +from glob import glob +from gi.repository import GLib, UDisks + +#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs +VDEV_SIZE = 300000000 # size of virtual test device + +# Those file systems are known to have a broken handling of permissions, in +# particular the executable bit +BROKEN_PERMISSIONS_FS = ['ntfs'] + +# Some D-BUS API methods cause properties to not be up to date yet when a +# method call finishes, thus we do an udevadm settle as a workaround. Those +# methods should eventually get fixed properly, but it's unnerving to have +# the tests fail on them when you are working on something else. This flag +# gets set by the --no-workarounds option to disable those syncs, so that these +# race conditions can be fixed. +workaround_syncs = False + + +# ---------------------------------------------------------------------------- + +class UDisksTestCase(unittest.TestCase): + '''Base class for udisks test cases. + + This provides static functions which are useful for all test cases. + ''' + tool_path = None + daemon = None + daemon_log = None + device = None + + client = None + manager = None + + @classmethod + def init(klass, logfile=None): + '''start daemon and set up test environment''' + + if os.geteuid() != 0: + print('this test suite needs to run as root', file=sys.stderr) + sys.exit(0) + + # run from local build tree if we are in one, otherwise use system instance + daemon_path = os.path.join(srcdir, 'src', 'udisksd') + if (os.access (daemon_path, os.X_OK)): + klass.tool_path = 'tools/udisksctl' + print('Testing binaries from local build tree') + klass.check_build_tree_config() + else: + print('Testing installed system binaries') + daemon_path = None + for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'): + if l.startswith('Exec='): + daemon_path = l.split('=', 1)[1].strip() + break + assert daemon_path, 'could not determine daemon path from D-BUS .service file' + + klass.tool_path = 'udisksctl' + + print('daemon path: ' + daemon_path) + + klass.device = klass.setup_vdev() + + # inhibit GNOME automounting/nautilus pop ups + subprocess.call(['killall', '-STOP', 'gvfs-gdu-volume-monitor']) + + # start daemon + if logfile: + klass.daemon_log = open(logfile, 'w') + else: + klass.daemon_log = tempfile.TemporaryFile() + klass.daemon = subprocess.Popen([daemon_path, '--replace'], + stdout=klass.daemon_log, stderr=subprocess.STDOUT) + assert klass.daemon.pid, 'daemon failed to start' + + atexit.register(klass.cleanup) + + # wait until the daemon has started up + timeout = 10 + while klass.manager is None and timeout > 0: + time.sleep(0.2) + klass.client = UDisks.Client.new_sync(None) + assert klass.client != None + klass.manager = klass.client.get_manager() + timeout -= 1 + assert klass.manager, 'daemon failed to start' + + klass.sync() + + @classmethod + def cleanup(klass): + '''stop daemon again and clean up test environment''' + + subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed + + os.kill(klass.daemon.pid, signal.SIGTERM) + os.wait() + klass.daemon = None + + klass.teardown_vdev(klass.device) + klass.device = None + + # resume GNOME automounting/nautilus pop ups + subprocess.call(['killall', '-CONT', 'gvfs-gdu-volume-monitor']) + + @classmethod + def sync(klass): + '''Wait until pending events finished processing. + + This should only be called for situations where we genuinely have an + asynchronous response, like invoking a CLI program and waiting for + udev/udisks to catch up on the change events. + ''' + subprocess.call(['udevadm', 'settle']) + klass.client.settle() + + @classmethod + def sync_workaround(klass): + '''Wait until pending events finished processing (bug workaround). + + This should be called for race conditions in the D-BUS API which cause + properties to not be up to date yet when a method call finishes. Those + should eventually get fixed properly, but it's unnerving to have the + tests fail on them when you are working on something else. + + This sync is not done if running with --no-workarounds. + ''' + if workaround_syncs: + klass.sync() + + @classmethod + def zero_device(klass): + subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'], + stderr=subprocess.PIPE) + klass.sync() + + @classmethod + def devname(klass, partition=None): + '''Get name of test device or one of its partitions''' + + if partition: + if klass.device[-1].isdigit(): + return klass.device + 'p' + str(partition) + else: + return klass.device + str(partition) + else: + return klass.device + + @classmethod + def udisks_block(klass, partition=None): + '''Get UDisksBlock object for test device or partition''' + + assert klass.client + devname = klass.devname(partition) + dev_t = os.stat(devname).st_rdev + block = klass.client.get_block_for_dev(dev_t) + assert block, 'did not find an UDisksBlock object for %s' % devname + return block + + @classmethod + def udisks_filesystem(klass, partition=None): + '''Get UDisksFilesystem object for test device or partition + + Return None if there is no file system on that device. + ''' + block = klass.udisks_block(partition) + return klass.client.get_object(block.get_object_path()).get_property('filesystem') + + @classmethod + def blkid(klass, partition=None): + '''Call blkid and return dictionary of results.''' + + result = {} + cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', + klass.devname(partition)], stdout=subprocess.PIPE) + for l in cmd.stdout: + (key, value) = l.decode('UTF-8').split('=', 1) + result[key] = value.strip() + assert cmd.wait() == 0 + return result + + @classmethod + def is_mountpoint(klass, path): + '''Check if given path is a mount point.''' + + return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0 + + @classmethod + def mkfs(klass, type, label=None, partition=None): + '''Create file system using mkfs.''' + + if type == 'minix': + assert label is None, 'minix does not support labels' + + # work around mkswap not properly cleaning up an existing reiserfs + # signature (mailed kzak about it) + if type == 'swap': + subprocess.check_call(['wipefs', '-a', klass.devname(partition)]) + + mkcmd = { 'swap': 'mkswap', + } + label_opt = { 'vfat': '-n', + 'reiserfs': '-l', + } + extra_opt = { 'vfat': [ '-I', '-F', '32'], + 'swap': ['-f'], + 'xfs': ['-f'], # XFS complains if there's an existing FS, so force + 'ext2': ['-F'], # ext* complains about using entire device, so force + 'ext3': ['-F'], + 'ext4': ['-F'], + 'ntfs': ['-F'], + 'reiserfs': ['-ff'], + } + + cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, []) + if label: + cmd += [label_opt.get(type, '-L'), label] + cmd.append(klass.devname(partition)) + + subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # kernel/udev generally detect those changes itself, but do not quite + # tell us when they are done; so do a little kludge here to know how + # long we need to wait + subprocess.call(['udevadm', 'trigger', '--action=change', + '--sysname-match=' + os.path.basename(klass.devname(partition))]) + klass.sync() + + @classmethod + def fs_create(klass, partition, type, options): + '''Create file system using udisks.''' + + block = klass.udisks_block(partition) + block.call_format_sync(type, options, None) + klass.sync_workaround() + + @classmethod + def retry_busy(klass, fn, *args): + '''Call a function until it does not fail with "Busy".''' + + timeout = 10 + while timeout >= 0: + try: + return fn(*args) + except GLib.GError as e: + if not 'UDisks.Error.DeviceBusy' in e.message: + raise + sys.stderr.write('[busy] ') + time.sleep(0.3) + timeout -= 1 + + @classmethod + def check_build_tree_config(klass): + '''Check configuration of build tree''' + + # read make variables + make_vars = {} + var_re = re.compile('^([a-zA-Z_]+) = (.*)$') + make = subprocess.Popen(['make', '-p', '/dev/null'], + stdout=subprocess.PIPE) + for l in make.stdout: + l = l.decode('UTF-8') + m = var_re.match(l) + if m: + make_vars[m.group(1)] = m.group(2) + make.wait() + + # expand make variables + subst_re = re.compile('\${([a-zA-Z_]+)}') + for (k, v) in make_vars.items(): + while True: + m = subst_re.search(v) + if m: + v = subst_re.sub(make_vars.get(m.group(1), ''), v) + make_vars[k] = v + else: + break + + # check localstatedir + for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks'), + os.path.join(make_vars['localstatedir'], 'lib', 'udisks')): + if not os.path.exists(d): + sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d) + sys.exit(0) + + @classmethod + def setup_vdev(klass): + '''create virtual test device + + It is zeroed out initially. + + Return the device path. + ''' + # ensure that the scsi_debug module is loaded + if os.path.isdir('/sys/module/scsi_debug'): + sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n') + sys.exit(1) + + assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % ( + VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug' + + # wait until all drives are created + dirs = [] + while len(dirs) < 1: + dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block') + time.sleep(0.1) + assert len(dirs) == 1 + + # determine the debug block devices + devs = os.listdir(dirs[0]) + assert len(devs) == 1 + dev = '/dev/' + devs[0] + assert os.path.exists(dev) + + # let's be 100% sure that we pick a virtual one + assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug' + + print('Set up test device: ' + dev) + return dev + + @classmethod + def teardown_vdev(klass, device): + '''release and remove virtual test device''' + + klass.remove_device(device) + assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \ + 'Failure to rmmod scsi_debug' + + @classmethod + def remove_device(klass, device): + '''remove virtual test device''' + + device = device.split('/')[-1] + if os.path.exists('/sys/block/' + device): + f = open('/sys/block/%s/device/delete' % device, 'w') + f.write('1') + f.close() + while os.path.exists(device): + time.sleep(0.1) + klass.sync() + time.sleep(0.5) # TODO + + @classmethod + def readd_devices(klass): + '''re-add virtual test devices after removal''' + + scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan') + assert len(scan_files) > 0 + for f in scan_files: + open(f, 'w').write('- - -\n') + while not os.path.exists(klass.device): + time.sleep(0.1) + time.sleep(0.5) + klass.sync() + +# ---------------------------------------------------------------------------- + +class Manager(UDisksTestCase): + '''UDisksManager operations''' + + def test_version(self): + '''daemon version''' + + self.assertTrue(self.manager.get_property('version')[0].isdigit()) + +# ---------------------------------------------------------------------------- + +class Drive(UDisksTestCase): + '''UDisksDrive''' + + def setUp(self): + self.drive = self.client.get_drive_for_block(self.udisks_block()) + self.assertNotEqual(self.drive, None) + + def test_properties(self): + '''properties of UDisksDrive object''' + + self.assertEqual(self.drive.get_property('model'), 'scsi_debug') + self.assertEqual(self.drive.get_property('vendor'), 'Linux') + self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0) + self.assertEqual(self.drive.get_property('media-available'), True) + self.assertEqual(self.drive.get_property('optical'), False) + + self.assertNotEqual(len(self.drive.get_property('serial')), 0) + self.assertNotEqual(len(self.drive.get_property('revision')), 0) + +# ---------------------------------------------------------------------------- + +class FS(UDisksTestCase): + '''Test detection of all supported file systems''' + + def setUp(self): + self.workdir = tempfile.mkdtemp() + self.block = self.udisks_block() + self.assertNotEqual(self.block, None) + + def tearDown(self): + if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0: + sys.stderr.write('[cleanup unmount] ') + shutil.rmtree (self.workdir) + + def test_zero(self): + '''properties of zeroed out device''' + + self.zero_device() + self.assertEqual(self.block.get_property('device'), self.device) + self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive')) + self.assertEqual(self.block.get_property('hint-system'), True) + self.assertEqual(self.block.get_property('id-label'), '') + self.assertEqual(self.block.get_property('id-usage'), '') + self.assertEqual(self.block.get_property('id-type'), '') + self.assertEqual(self.block.get_property('id-uuid'), '') + self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0) + obj = self.client.get_object(self.block.get_object_path()) + self.assertEqual(obj.get_property('filesystem'), None) + self.assertEqual(obj.get_property('partition'), None) + self.assertEqual(obj.get_property('partition-table'), None) + + def test_ext2(self): + '''fs: ext2''' + self._do_fs_check('ext2') + + def test_ext3(self): + '''fs: ext3''' + self._do_fs_check('ext3') + + def test_ext4(self): + '''fs: ext4''' + self._do_fs_check('ext4') + + def test_btrfs(self): + '''fs: btrfs''' + self._do_fs_check('btrfs') + + def test_minix(self): + '''fs: minix''' + self._do_fs_check('minix') + + def test_xfs(self): + '''fs: XFS''' + self._do_fs_check('xfs') + + def test_ntfs(self): + '''fs: NTFS''' + self._do_fs_check('ntfs') + + def test_vfat(self): + '''fs: FAT''' + self._do_fs_check('vfat') + + def test_reiserfs(self): + '''fs: reiserfs''' + self._do_fs_check('reiserfs') + + def test_swap(self): + '''fs: swap''' + self._do_fs_check('swap') + + def test_nilfs2(self): + '''fs: nilfs2''' + self._do_fs_check('nilfs2') + + def test_empty(self): + '''fs: empty''' + + self.mkfs('ext4', 'foo') + block = self.udisks_block() + self.assertEqual(block.get_property('id-usage'), 'filesystem') + self.assertEqual(block.get_property('id-type'), 'ext4') + self.assertEqual(block.get_property('id-label'), 'foo') + self.assertNotEqual(self.udisks_filesystem(), None) + + self.fs_create(None, 'empty', GLib.Variant('a{sv}', {})) + + self.assertEqual(block.get_property('id-usage'), '') + self.assertEqual(block.get_property('id-type'), '') + self.assertEqual(block.get_property('id-label'), '') + self.assertEqual(self.udisks_filesystem(), None) + + def test_create_fs_unknown_type(self): + '''Format() with unknown type''' + + try: + self.fs_create(None, 'bogus', GLib.Variant('a{sv}', {})) + self.fail('Expected failure for bogus file system') + except GLib.GError as e: + self.assertTrue('UDisks.Error.NotSupported' in e.message) + self.assertTrue('type bogus' in e.message) + + def test_create_fs_unsupported_label(self): + '''Format() with unsupported label''' + + options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')}) + try: + self.fs_create(None, 'minix', options) + self.fail('Expected failure for unsupported label') + except GLib.GError as e: + self.assertTrue('UDisks.Error.NotSupported' in e.message) + + def test_force_removal(self): + '''fs: forced removal''' + + # create a fs and mount it + self.mkfs('ext4', 'udiskstest') + fs = self.udisks_filesystem() + mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None) + self.assertEqual(mount_path, '/media/udiskstest') + self.assertTrue(self.is_mountpoint('/media/udiskstest')) + + dev_t = os.stat(self.devname()).st_rdev + + # removal should clean up mounts + self.remove_device(self.device) + self.assertFalse(os.path.exists(mount_path)) + self.assertEqual(self.client.get_block_for_dev(dev_t), None) + + # after putting it back, it should be mountable again + self.readd_devices() + fs = self.udisks_filesystem() + self.assertEqual(fs.get_property('mount-points'), []) + + mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None) + self.assertTrue(self.is_mountpoint('/media/udiskstest')) + self.assertEqual(mount_path, '/media/udiskstest') + self.client.settle() + self.assertEqual(fs.get_property('mount-points'), ['/media/udiskstest']) + + self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None) + self.client.settle() + self.assertEqual(fs.get_property('mount-points'), []) + + def _do_fs_check(self, type): + '''Run checks for a particular file system.''' + + if type != 'swap' and subprocess.call(['which', 'mkfs.' + type], + stdout=subprocess.PIPE) != 0: + sys.stderr.write('[no mkfs.%s, skip] ' % type) + + # check correct D-Bus exception + try: + self.fs_create(None, type, GLib.Variant('a{sv}', {})) + self.fail('Expected failure for missing mkfs.' + type) + except GLib.GError as e: + self.assertTrue('UDisks.Error.Failed' in e.message) + return + + # do checks with command line tools (mkfs/mount/umount) + sys.stderr.write('[cli] ') + sys.stderr.flush() + + self._do_cli_check(type) + if type != 'minix': + self._do_cli_check(type, 'test%stst' % type) + + # put a different fs here instead of zeroing, so that we verify that + # udisks overrides existing FS (e. g. XFS complains then), and does not + # leave traces of other FS around + if type == 'ext3': + self.mkfs('swap') + else: + self.mkfs('ext3') + + # do checks with udisks operations + sys.stderr.write('[ud] ') + self._do_udisks_check(type) + if type != 'minix': + self._do_udisks_check(type, 'test%stst' % type) + # also test fs_create with an empty label + self._do_udisks_check(type, '') + + def _do_cli_check(self, type, label=None): + '''udisks correctly picks up file system changes from command line tools''' + + self.mkfs(type, label) + + block = self.udisks_block() + + self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem') + + self.assertEqual(block.get_property('id-type'), type) + self.assertEqual(block.get_property('id-label'), label or '') + self.assertEqual(block.get_property('hint-name'), '') + if type != 'minix': + self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID']) + + obj = self.client.get_object(self.block.get_object_path()) + self.assertEqual(obj.get_property('partition'), None) + self.assertEqual(obj.get_property('partition-table'), None) + + fs = obj.get_property('filesystem') + if type == 'swap': + self.assertEqual(fs, None) + else: + self.assertNotEqual(fs, None) + + if type == 'swap': + return + + # mount it + if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'], + stdout=subprocess.PIPE) == 0: + # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu + # defaults to ntfs-3g if installed); TODO: check other distros + mount_prog = 'mount.ntfs-3g' + else: + mount_prog = 'mount' + ret = subprocess.call([mount_prog, self.device, self.workdir]) + if ret == 32: + # missing fs driver + sys.stderr.write('[missing kernel driver, skip] ') + return + self.assertEqual(ret, 0) + + self.sync() + self.assertEqual(fs.get_property('mount-points'), [self.workdir]) + + # unmount it + subprocess.call(['umount', self.workdir]) + self.sync() + self.assertEqual(fs.get_property('mount-points'), []) + + def _do_udisks_check(self, type, label=None): + '''udisks API correctly changes file system''' + + # create fs + if label is not None: + options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)}) + else: + options = GLib.Variant('a{sv}', {}) + self.fs_create(None, type, options) + + # properties + id = self.blkid() + self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem') + self.assertEqual(id['ID_FS_TYPE'], type) + self.assertEqual(id.get('ID_FS_LABEL', ''), label or '') + + block = self.udisks_block() + self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem') + self.assertEqual(block.get_property('id-type'), type) + self.assertEqual(block.get_property('id-label'), label or '') + + if type == 'swap': + return + + obj = self.client.get_object(self.block.get_object_path()) + self.assertEqual(obj.get_property('partition'), None) + self.assertEqual(obj.get_property('partition-table'), None) + + fs = self.udisks_filesystem() + self.assertNotEqual(fs, None, 'no Filesystem interface for test device') + self.assertEqual(fs.get_property('mount-points'), []) + + # mount + mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None) + + if label: + self.assertEqual(mount_path, '/media/' + label) + else: + self.assertTrue(mount_path.startswith('/media/')) + + self.client.settle() + self.assertEqual(fs.get_property('mount-points'), [mount_path]) + self.assertTrue(self.is_mountpoint(mount_path)) + + # no ownership taken, should be root owned + st = os.stat(mount_path) + self.assertEqual((st.st_uid, st.st_gid), (0, 0)) + + self._do_file_perms_checks(type, mount_path) + + # unmount + self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None) + self.assertFalse(os.path.exists(mount_path), 'mount point was not removed') + self.assertEqual(fs.get_property('mount-points'), [mount_path]) + + # create fs with taking ownership (daemon:mail == 1:8) + #if supports_unix_owners: + # options.append('take_ownership_uid=1') + # options.append('take_ownership_gid=8') + # self.fs_create(None, type, options) + # mount_path = iface.FilesystemMount('', []) + # st = os.stat(mount_path) + # self.assertEqual((st.st_uid, st.st_gid), (1, 8)) + # self.retry_busy(self.partition_iface().FilesystemUnmount, []) + # self.assertFalse(os.path.exists(mount_path), 'mount point was not removed') + + # change label + supported = True + l = 'n"a\m\\"e' + type + if type == 'vfat': + # VFAT does not support some characters + self.assertRaises(GLib.GError, fs.call_set_label_sync, l, + GLib.Variant('a{sv}', {}), None) + l = "n@a$me" + try: + fs.call_set_label_sync(l, GLib.Variant('a{sv}', {}), None) + except GLib.GError as e: + if 'UDisks.Error.NotSupported' in e.message: + # these fses are known to not support relabeling + self.assertTrue(type in ['minix', 'btrfs']) + supported = False + + if supported: + block = self.udisks_block() + blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace( + '\\x5c', '\\').replace('\\x24', '$') + self.sync_workaround() + if type == 'vfat': + # EXFAIL: often (but not always) the label appears in all upper case + self.assertEqual(blkid_label.upper(), l.upper()) + self.assertEqual(block.get_property('id-label').upper(), l.upper()) + else: + self.assertEqual(blkid_label, l) + self.assertEqual(block.get_property('id-label'), l) + + # test setting empty label + fs.call_set_label_sync('', GLib.Variant('a{sv}', {}), None) + self.sync_workaround() + self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '') + self.assertEqual(block.get_property('id-label'), '') + + # check fs - Not implemented in udisks yet + #self.assertEqual(iface.FilesystemCheck([]), True) + + def _do_file_perms_checks(self, type, mount_point): + '''Check for permissions for data files and executables. + + This particularly checks sane and useful permissions on non-Unix file + systems like vfat. + ''' + if type in BROKEN_PERMISSIONS_FS: + return + + f = os.path.join(mount_point, 'simpledata.txt') + open(f, 'w').close() + self.assertTrue(os.access(f, os.R_OK)) + self.assertTrue(os.access(f, os.W_OK)) + self.assertFalse(os.access(f, os.X_OK)) + + f = os.path.join(mount_point, 'simple.exe') + shutil.copy('/bin/bash', f) + self.assertTrue(os.access(f, os.R_OK)) + self.assertTrue(os.access(f, os.W_OK)) + self.assertTrue(os.access(f, os.X_OK)) + + os.mkdir(os.path.join(mount_point, 'subdir')) + f = os.path.join(mount_point, 'subdir', 'subdirdata.txt') + open(f, 'w').close() + self.assertTrue(os.access(f, os.R_OK)) + self.assertTrue(os.access(f, os.W_OK)) + self.assertFalse(os.access(f, os.X_OK)) + + f = os.path.join(mount_point, 'subdir', 'subdir.exe') + shutil.copy('/bin/bash', f) + self.assertTrue(os.access(f, os.R_OK)) + self.assertTrue(os.access(f, os.W_OK)) + self.assertTrue(os.access(f, os.X_OK)) + +## ---------------------------------------------------------------------------- + +class Smart(UDisksTestCase): + '''Check SMART operation.''' + + def test_sda(self): + '''SMART status of first internal hard disk + + This is a best-effort readonly test. + ''' + hd = '/dev/sda' + + if not os.path.exists(hd): + sys.stderr.write('[skip] ') + return + + has_smart = subprocess.call(['skdump', '--can-smart', hd], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0 + + block = self.client.get_block_for_dev(os.stat(hd).st_rdev) + self.assertNotEqual(block, None) + drive = self.client.get_drive_for_block(block) + ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata') + self.assertEqual(ata != None, has_smart) + + if has_smart: + sys.stderr.write('[avail] ') + self.assertEqual(ata.get_property('smart-supported'), True) + self.assertEqual(ata.get_property('smart-enabled'), True) + + # wait for SMART data to be read + while ata.get_property('smart-updated') == 0: + sys.stderr.write('[wait for data] ') + time.sleep(0.5) + + # this is of course not truly correct for a test suite, but let's + # consider it a courtesy for developers :-) + self.assertEqual(ata.get_property('smart-failing'), False) + self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress']) + else: + sys.stderr.write('[N/A] ') + + +# ---------------------------------------------------------------------------- + +if __name__ == '__main__': + argparser = argparse.ArgumentParser(description='udisks2 integration test suite') + argparser.add_argument('-l', '--log-file', dest='logfile', + help='write daemon log to a file') + argparser.add_argument('-w', '--no-workarounds', + action="store_true", default=False, + help='Disable workarounds for race conditions in the D-BUS API') + argparser.add_argument('testname', nargs='*', + help='name of test class or method (e. g. "Drive", "FS.test_ext2")') + args = argparser.parse_args() + + workaround_syncs = not args.no_workarounds + + UDisksTestCase.init(logfile=args.logfile) + if args.testname: + tests = unittest.TestLoader().loadTestsFromNames(args.testname, + __import__('__main__')) + else: + tests = unittest.TestLoader().loadTestsFromName('__main__') + if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful(): + sys.exit(0) + else: + sys.exit(1) + -- 2.7.4