--- /dev/null
+# udisks2 integration test suite
+# Run in udisks built tree to test local built binaries (needs
+# --localstatedir=/var), or from anywhere else to test system installed
+# binaries.
+# Usage:
+# - Run all tests:
+# src/tests/integration-test
+# - Run only a particular class of tests:
+# src/tests/integration-test Drive
+# - Run only a single test:
+# src/tests/integration-test FS.test_ext3
+# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+import sys
+import os
+srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
+libdir = os.path.join(srcdir, 'udisks', '.libs')
+# as we can't change LD_LIBRARY_PATH within a running program, and doing
+# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
+# nasty hack
+if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
+ os.environ['LD_LIBRARY_PATH'] = libdir
+ os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
+ srcdir,
+ os.environ.get('GI_TYPELIB_PATH', ''))
+ os.execv(sys.argv[0], sys.argv)
+ assert False, 'not expecting to land here'
+import subprocess
+import unittest
+import tempfile
+import atexit
+import time
+import shutil
+import signal
+import argparse
+import re
+from glob import glob
+from gi.repository import GLib, UDisks
+#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
+VDEV_SIZE = 300000000 # size of virtual test device
+# Those file systems are known to have a broken handling of permissions, in
+# particular the executable bit
+# Some D-BUS API methods cause properties to not be up to date yet when a
+# method call finishes, thus we do an udevadm settle as a workaround. Those
+# methods should eventually get fixed properly, but it's unnerving to have
+# the tests fail on them when you are working on something else. This flag
+# gets set by the --no-workarounds option to disable those syncs, so that these
+# race conditions can be fixed.
+workaround_syncs = False
+# ----------------------------------------------------------------------------
+class UDisksTestCase(unittest.TestCase):
+ '''Base class for udisks test cases.
+ This provides static functions which are useful for all test cases.
+ '''
+ tool_path = None
+ daemon = None
+ daemon_log = None
+ device = None
+ client = None
+ manager = None
+ @classmethod
+ def init(klass, logfile=None):
+ '''start daemon and set up test environment'''
+ if os.geteuid() != 0:
+ print('this test suite needs to run as root', file=sys.stderr)
+ sys.exit(0)
+ # run from local build tree if we are in one, otherwise use system instance
+ daemon_path = os.path.join(srcdir, 'src', 'udisksd')
+ if (os.access (daemon_path, os.X_OK)):
+ klass.tool_path = 'tools/udisksctl'
+ print('Testing binaries from local build tree')
+ klass.check_build_tree_config()
+ else:
+ print('Testing installed system binaries')
+ daemon_path = None
+ for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
+ if l.startswith('Exec='):
+ daemon_path = l.split('=', 1)[1].strip()
+ break
+ assert daemon_path, 'could not determine daemon path from D-BUS .service file'
+ klass.tool_path = 'udisksctl'
+ print('daemon path: ' + daemon_path)
+ klass.device = klass.setup_vdev()
+ # inhibit GNOME automounting/nautilus pop ups
+ subprocess.call(['killall', '-STOP', 'gvfs-gdu-volume-monitor'])
+ # start daemon
+ if logfile:
+ klass.daemon_log = open(logfile, 'w')
+ else:
+ klass.daemon_log = tempfile.TemporaryFile()
+ klass.daemon = subprocess.Popen([daemon_path, '--replace'],
+ stdout=klass.daemon_log, stderr=subprocess.STDOUT)
+ assert klass.daemon.pid, 'daemon failed to start'
+ atexit.register(klass.cleanup)
+ # wait until the daemon has started up
+ timeout = 10
+ while klass.manager is None and timeout > 0:
+ time.sleep(0.2)
+ klass.client = UDisks.Client.new_sync(None)
+ assert klass.client != None
+ klass.manager = klass.client.get_manager()
+ timeout -= 1
+ assert klass.manager, 'daemon failed to start'
+ klass.sync()
+ @classmethod
+ def cleanup(klass):
+ '''stop daemon again and clean up test environment'''
+ subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
+ os.kill(klass.daemon.pid, signal.SIGTERM)
+ os.wait()
+ klass.daemon = None
+ klass.teardown_vdev(klass.device)
+ klass.device = None
+ # resume GNOME automounting/nautilus pop ups
+ subprocess.call(['killall', '-CONT', 'gvfs-gdu-volume-monitor'])
+ @classmethod
+ def sync(klass):
+ '''Wait until pending events finished processing.
+ This should only be called for situations where we genuinely have an
+ asynchronous response, like invoking a CLI program and waiting for
+ udev/udisks to catch up on the change events.
+ '''
+ subprocess.call(['udevadm', 'settle'])
+ klass.client.settle()
+ @classmethod
+ def sync_workaround(klass):
+ '''Wait until pending events finished processing (bug workaround).
+ This should be called for race conditions in the D-BUS API which cause
+ properties to not be up to date yet when a method call finishes. Those
+ should eventually get fixed properly, but it's unnerving to have the
+ tests fail on them when you are working on something else.
+ This sync is not done if running with --no-workarounds.
+ '''
+ if workaround_syncs:
+ klass.sync()
+ @classmethod
+ def zero_device(klass):
+ subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
+ stderr=subprocess.PIPE)
+ klass.sync()
+ @classmethod
+ def devname(klass, partition=None):
+ '''Get name of test device or one of its partitions'''
+ if partition:
+ if klass.device[-1].isdigit():
+ return klass.device + 'p' + str(partition)
+ else:
+ return klass.device + str(partition)
+ else:
+ return klass.device
+ @classmethod
+ def udisks_block(klass, partition=None):
+ '''Get UDisksBlock object for test device or partition'''
+ assert klass.client
+ devname = klass.devname(partition)
+ dev_t = os.stat(devname).st_rdev
+ block = klass.client.get_block_for_dev(dev_t)
+ assert block, 'did not find an UDisksBlock object for %s' % devname
+ return block
+ @classmethod
+ def udisks_filesystem(klass, partition=None):
+ '''Get UDisksFilesystem object for test device or partition
+ Return None if there is no file system on that device.
+ '''
+ block = klass.udisks_block(partition)
+ return klass.client.get_object(block.get_object_path()).get_property('filesystem')
+ @classmethod
+ def blkid(klass, partition=None):
+ '''Call blkid and return dictionary of results.'''
+ result = {}
+ cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev',
+ klass.devname(partition)], stdout=subprocess.PIPE)
+ for l in cmd.stdout:
+ (key, value) = l.decode('UTF-8').split('=', 1)
+ result[key] = value.strip()
+ assert cmd.wait() == 0
+ return result
+ @classmethod
+ def is_mountpoint(klass, path):
+ '''Check if given path is a mount point.'''
+ return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
+ @classmethod
+ def mkfs(klass, type, label=None, partition=None):
+ '''Create file system using mkfs.'''
+ if type == 'minix':
+ assert label is None, 'minix does not support labels'
+ # work around mkswap not properly cleaning up an existing reiserfs
+ # signature (mailed kzak about it)
+ if type == 'swap':
+ subprocess.check_call(['wipefs', '-a', klass.devname(partition)])
+ mkcmd = { 'swap': 'mkswap',
+ }
+ label_opt = { 'vfat': '-n',
+ 'reiserfs': '-l',
+ }
+ extra_opt = { 'vfat': [ '-I', '-F', '32'],
+ 'swap': ['-f'],
+ 'xfs': ['-f'], # XFS complains if there's an existing FS, so force
+ 'ext2': ['-F'], # ext* complains about using entire device, so force
+ 'ext3': ['-F'],
+ 'ext4': ['-F'],
+ 'ntfs': ['-F'],
+ 'reiserfs': ['-ff'],
+ }
+ cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
+ if label:
+ cmd += [label_opt.get(type, '-L'), label]
+ cmd.append(klass.devname(partition))
+ subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ # kernel/udev generally detect those changes itself, but do not quite
+ # tell us when they are done; so do a little kludge here to know how
+ # long we need to wait
+ subprocess.call(['udevadm', 'trigger', '--action=change',
+ '--sysname-match=' + os.path.basename(klass.devname(partition))])
+ klass.sync()
+ @classmethod
+ def fs_create(klass, partition, type, options):
+ '''Create file system using udisks.'''
+ block = klass.udisks_block(partition)
+ block.call_format_sync(type, options, None)
+ klass.sync_workaround()
+ @classmethod
+ def retry_busy(klass, fn, *args):
+ '''Call a function until it does not fail with "Busy".'''
+ timeout = 10
+ while timeout >= 0:
+ try:
+ return fn(*args)
+ except GLib.GError as e:
+ if not 'UDisks.Error.DeviceBusy' in e.message:
+ raise
+ sys.stderr.write('[busy] ')
+ time.sleep(0.3)
+ timeout -= 1
+ @classmethod
+ def check_build_tree_config(klass):
+ '''Check configuration of build tree'''
+ # read make variables
+ make_vars = {}
+ var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
+ make = subprocess.Popen(['make', '-p', '/dev/null'],
+ stdout=subprocess.PIPE)
+ for l in make.stdout:
+ l = l.decode('UTF-8')
+ m = var_re.match(l)
+ if m:
+ make_vars[m.group(1)] = m.group(2)
+ make.wait()
+ # expand make variables
+ subst_re = re.compile('\${([a-zA-Z_]+)}')
+ for (k, v) in make_vars.items():
+ while True:
+ m = subst_re.search(v)
+ if m:
+ v = subst_re.sub(make_vars.get(m.group(1), ''), v)
+ make_vars[k] = v
+ else:
+ break
+ # check localstatedir
+ for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks'),
+ os.path.join(make_vars['localstatedir'], 'lib', 'udisks')):
+ if not os.path.exists(d):
+ sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
+ sys.exit(0)
+ @classmethod
+ def setup_vdev(klass):
+ '''create virtual test device
+ It is zeroed out initially.
+ Return the device path.
+ '''
+ # ensure that the scsi_debug module is loaded
+ if os.path.isdir('/sys/module/scsi_debug'):
+ sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
+ sys.exit(1)
+ assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
+ VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
+ # wait until all drives are created
+ dirs = []
+ while len(dirs) < 1:
+ dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+ time.sleep(0.1)
+ assert len(dirs) == 1
+ # determine the debug block devices
+ devs = os.listdir(dirs[0])
+ assert len(devs) == 1
+ dev = '/dev/' + devs[0]
+ assert os.path.exists(dev)
+ # let's be 100% sure that we pick a virtual one
+ assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
+ print('Set up test device: ' + dev)
+ return dev
+ @classmethod
+ def teardown_vdev(klass, device):
+ '''release and remove virtual test device'''
+ klass.remove_device(device)
+ assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
+ 'Failure to rmmod scsi_debug'
+ @classmethod
+ def remove_device(klass, device):
+ '''remove virtual test device'''
+ device = device.split('/')[-1]
+ if os.path.exists('/sys/block/' + device):
+ f = open('/sys/block/%s/device/delete' % device, 'w')
+ f.write('1')
+ f.close()
+ while os.path.exists(device):
+ time.sleep(0.1)
+ klass.sync()
+ time.sleep(0.5) # TODO
+ @classmethod
+ def readd_devices(klass):
+ '''re-add virtual test devices after removal'''
+ scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
+ assert len(scan_files) > 0
+ for f in scan_files:
+ open(f, 'w').write('- - -\n')
+ while not os.path.exists(klass.device):
+ time.sleep(0.1)
+ time.sleep(0.5)
+ klass.sync()
+# ----------------------------------------------------------------------------
+class Manager(UDisksTestCase):
+ '''UDisksManager operations'''
+ def test_version(self):
+ '''daemon version'''
+ self.assertTrue(self.manager.get_property('version')[0].isdigit())
+# ----------------------------------------------------------------------------
+class Drive(UDisksTestCase):
+ '''UDisksDrive'''
+ def setUp(self):
+ self.drive = self.client.get_drive_for_block(self.udisks_block())
+ self.assertNotEqual(self.drive, None)
+ def test_properties(self):
+ '''properties of UDisksDrive object'''
+ self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
+ self.assertEqual(self.drive.get_property('vendor'), 'Linux')
+ self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
+ self.assertEqual(self.drive.get_property('media-available'), True)
+ self.assertEqual(self.drive.get_property('optical'), False)
+ self.assertNotEqual(len(self.drive.get_property('serial')), 0)
+ self.assertNotEqual(len(self.drive.get_property('revision')), 0)
+# ----------------------------------------------------------------------------
+class FS(UDisksTestCase):
+ '''Test detection of all supported file systems'''
+ def setUp(self):
+ self.workdir = tempfile.mkdtemp()
+ self.block = self.udisks_block()
+ self.assertNotEqual(self.block, None)
+ def tearDown(self):
+ if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
+ sys.stderr.write('[cleanup unmount] ')
+ shutil.rmtree (self.workdir)
+ def test_zero(self):
+ '''properties of zeroed out device'''
+ self.zero_device()
+ self.assertEqual(self.block.get_property('device'), self.device)
+ self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
+ self.assertEqual(self.block.get_property('hint-system'), True)
+ self.assertEqual(self.block.get_property('id-label'), '')
+ self.assertEqual(self.block.get_property('id-usage'), '')
+ self.assertEqual(self.block.get_property('id-type'), '')
+ self.assertEqual(self.block.get_property('id-uuid'), '')
+ self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
+ obj = self.client.get_object(self.block.get_object_path())
+ self.assertEqual(obj.get_property('filesystem'), None)
+ self.assertEqual(obj.get_property('partition'), None)
+ self.assertEqual(obj.get_property('partition-table'), None)
+ def test_ext2(self):
+ '''fs: ext2'''
+ self._do_fs_check('ext2')
+ def test_ext3(self):
+ '''fs: ext3'''
+ self._do_fs_check('ext3')
+ def test_ext4(self):
+ '''fs: ext4'''
+ self._do_fs_check('ext4')
+ def test_btrfs(self):
+ '''fs: btrfs'''
+ self._do_fs_check('btrfs')
+ def test_minix(self):
+ '''fs: minix'''
+ self._do_fs_check('minix')
+ def test_xfs(self):
+ '''fs: XFS'''
+ self._do_fs_check('xfs')
+ def test_ntfs(self):
+ '''fs: NTFS'''
+ self._do_fs_check('ntfs')
+ def test_vfat(self):
+ '''fs: FAT'''
+ self._do_fs_check('vfat')
+ def test_reiserfs(self):
+ '''fs: reiserfs'''
+ self._do_fs_check('reiserfs')
+ def test_swap(self):
+ '''fs: swap'''
+ self._do_fs_check('swap')
+ def test_nilfs2(self):
+ '''fs: nilfs2'''
+ self._do_fs_check('nilfs2')
+ def test_empty(self):
+ '''fs: empty'''
+ self.mkfs('ext4', 'foo')
+ block = self.udisks_block()
+ self.assertEqual(block.get_property('id-usage'), 'filesystem')
+ self.assertEqual(block.get_property('id-type'), 'ext4')
+ self.assertEqual(block.get_property('id-label'), 'foo')
+ self.assertNotEqual(self.udisks_filesystem(), None)
+ self.fs_create(None, 'empty', GLib.Variant('a{sv}', {}))
+ self.assertEqual(block.get_property('id-usage'), '')
+ self.assertEqual(block.get_property('id-type'), '')
+ self.assertEqual(block.get_property('id-label'), '')
+ self.assertEqual(self.udisks_filesystem(), None)
+ def test_create_fs_unknown_type(self):
+ '''Format() with unknown type'''
+ try:
+ self.fs_create(None, 'bogus', GLib.Variant('a{sv}', {}))
+ self.fail('Expected failure for bogus file system')
+ except GLib.GError as e:
+ self.assertTrue('UDisks.Error.NotSupported' in e.message)
+ self.assertTrue('type bogus' in e.message)
+ def test_create_fs_unsupported_label(self):
+ '''Format() with unsupported label'''
+ options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
+ try:
+ self.fs_create(None, 'minix', options)
+ self.fail('Expected failure for unsupported label')
+ except GLib.GError as e:
+ self.assertTrue('UDisks.Error.NotSupported' in e.message)
+ def test_force_removal(self):
+ '''fs: forced removal'''
+ # create a fs and mount it
+ self.mkfs('ext4', 'udiskstest')
+ fs = self.udisks_filesystem()
+ mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+ self.assertEqual(mount_path, '/media/udiskstest')
+ self.assertTrue(self.is_mountpoint('/media/udiskstest'))
+ dev_t = os.stat(self.devname()).st_rdev
+ # removal should clean up mounts
+ self.remove_device(self.device)
+ self.assertFalse(os.path.exists(mount_path))
+ self.assertEqual(self.client.get_block_for_dev(dev_t), None)
+ # after putting it back, it should be mountable again
+ self.readd_devices()
+ fs = self.udisks_filesystem()
+ self.assertEqual(fs.get_property('mount-points'), [])
+ mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+ self.assertTrue(self.is_mountpoint('/media/udiskstest'))
+ self.assertEqual(mount_path, '/media/udiskstest')
+ self.client.settle()
+ self.assertEqual(fs.get_property('mount-points'), ['/media/udiskstest'])
+ self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None)
+ self.client.settle()
+ self.assertEqual(fs.get_property('mount-points'), [])
+ def _do_fs_check(self, type):
+ '''Run checks for a particular file system.'''
+ if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
+ stdout=subprocess.PIPE) != 0:
+ sys.stderr.write('[no mkfs.%s, skip] ' % type)
+ # check correct D-Bus exception
+ try:
+ self.fs_create(None, type, GLib.Variant('a{sv}', {}))
+ self.fail('Expected failure for missing mkfs.' + type)
+ except GLib.GError as e:
+ self.assertTrue('UDisks.Error.Failed' in e.message)
+ return
+ # do checks with command line tools (mkfs/mount/umount)
+ sys.stderr.write('[cli] ')
+ sys.stderr.flush()
+ self._do_cli_check(type)
+ if type != 'minix':
+ self._do_cli_check(type, 'test%stst' % type)
+ # put a different fs here instead of zeroing, so that we verify that
+ # udisks overrides existing FS (e. g. XFS complains then), and does not
+ # leave traces of other FS around
+ if type == 'ext3':
+ self.mkfs('swap')
+ else:
+ self.mkfs('ext3')
+ # do checks with udisks operations
+ sys.stderr.write('[ud] ')
+ self._do_udisks_check(type)
+ if type != 'minix':
+ self._do_udisks_check(type, 'test%stst' % type)
+ # also test fs_create with an empty label
+ self._do_udisks_check(type, '')
+ def _do_cli_check(self, type, label=None):
+ '''udisks correctly picks up file system changes from command line tools'''
+ self.mkfs(type, label)
+ block = self.udisks_block()
+ self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
+ self.assertEqual(block.get_property('id-type'), type)
+ self.assertEqual(block.get_property('id-label'), label or '')
+ self.assertEqual(block.get_property('hint-name'), '')
+ if type != 'minix':
+ self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
+ obj = self.client.get_object(self.block.get_object_path())
+ self.assertEqual(obj.get_property('partition'), None)
+ self.assertEqual(obj.get_property('partition-table'), None)
+ fs = obj.get_property('filesystem')
+ if type == 'swap':
+ self.assertEqual(fs, None)
+ else:
+ self.assertNotEqual(fs, None)
+ if type == 'swap':
+ return
+ # mount it
+ if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
+ stdout=subprocess.PIPE) == 0:
+ # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
+ # defaults to ntfs-3g if installed); TODO: check other distros
+ mount_prog = 'mount.ntfs-3g'
+ else:
+ mount_prog = 'mount'
+ ret = subprocess.call([mount_prog, self.device, self.workdir])
+ if ret == 32:
+ # missing fs driver
+ sys.stderr.write('[missing kernel driver, skip] ')
+ return
+ self.assertEqual(ret, 0)
+ self.sync()
+ self.assertEqual(fs.get_property('mount-points'), [self.workdir])
+ # unmount it
+ subprocess.call(['umount', self.workdir])
+ self.sync()
+ self.assertEqual(fs.get_property('mount-points'), [])
+ def _do_udisks_check(self, type, label=None):
+ '''udisks API correctly changes file system'''
+ # create fs
+ if label is not None:
+ options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
+ else:
+ options = GLib.Variant('a{sv}', {})
+ self.fs_create(None, type, options)
+ # properties
+ id = self.blkid()
+ self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
+ self.assertEqual(id['ID_FS_TYPE'], type)
+ self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
+ block = self.udisks_block()
+ self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
+ self.assertEqual(block.get_property('id-type'), type)
+ self.assertEqual(block.get_property('id-label'), label or '')
+ if type == 'swap':
+ return
+ obj = self.client.get_object(self.block.get_object_path())
+ self.assertEqual(obj.get_property('partition'), None)
+ self.assertEqual(obj.get_property('partition-table'), None)
+ fs = self.udisks_filesystem()
+ self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
+ self.assertEqual(fs.get_property('mount-points'), [])
+ # mount
+ mount_path = fs.call_mount_sync(GLib.Variant('a{sv}', {}), None)
+ if label:
+ self.assertEqual(mount_path, '/media/' + label)
+ else:
+ self.assertTrue(mount_path.startswith('/media/'))
+ self.client.settle()
+ self.assertEqual(fs.get_property('mount-points'), [mount_path])
+ self.assertTrue(self.is_mountpoint(mount_path))
+ # no ownership taken, should be root owned
+ st = os.stat(mount_path)
+ self.assertEqual((st.st_uid, st.st_gid), (0, 0))
+ self._do_file_perms_checks(type, mount_path)
+ # unmount
+ self.retry_busy(fs.call_unmount_sync, GLib.Variant('a{sv}', {}), None)
+ self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+ self.assertEqual(fs.get_property('mount-points'), [mount_path])
+ # create fs with taking ownership (daemon:mail == 1:8)
+ #if supports_unix_owners:
+ # options.append('take_ownership_uid=1')
+ # options.append('take_ownership_gid=8')
+ # self.fs_create(None, type, options)
+ # mount_path = iface.FilesystemMount('', [])
+ # st = os.stat(mount_path)
+ # self.assertEqual((st.st_uid, st.st_gid), (1, 8))
+ # self.retry_busy(self.partition_iface().FilesystemUnmount, [])
+ # self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+ # change label
+ supported = True
+ l = 'n"a\m\\"e' + type
+ if type == 'vfat':
+ # VFAT does not support some characters
+ self.assertRaises(GLib.GError, fs.call_set_label_sync, l,
+ GLib.Variant('a{sv}', {}), None)
+ l = "n@a$me"
+ try:
+ fs.call_set_label_sync(l, GLib.Variant('a{sv}', {}), None)
+ except GLib.GError as e:
+ if 'UDisks.Error.NotSupported' in e.message:
+ # these fses are known to not support relabeling
+ self.assertTrue(type in ['minix', 'btrfs'])
+ supported = False
+ if supported:
+ block = self.udisks_block()
+ blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
+ '\\x5c', '\\').replace('\\x24', '$')
+ self.sync_workaround()
+ if type == 'vfat':
+ # EXFAIL: often (but not always) the label appears in all upper case
+ self.assertEqual(blkid_label.upper(), l.upper())
+ self.assertEqual(block.get_property('id-label').upper(), l.upper())
+ else:
+ self.assertEqual(blkid_label, l)
+ self.assertEqual(block.get_property('id-label'), l)
+ # test setting empty label
+ fs.call_set_label_sync('', GLib.Variant('a{sv}', {}), None)
+ self.sync_workaround()
+ self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
+ self.assertEqual(block.get_property('id-label'), '')
+ # check fs - Not implemented in udisks yet
+ #self.assertEqual(iface.FilesystemCheck([]), True)
+ def _do_file_perms_checks(self, type, mount_point):
+ '''Check for permissions for data files and executables.
+ This particularly checks sane and useful permissions on non-Unix file
+ systems like vfat.
+ '''
+ return
+ f = os.path.join(mount_point, 'simpledata.txt')
+ open(f, 'w').close()
+ self.assertTrue(os.access(f, os.R_OK))
+ self.assertTrue(os.access(f, os.W_OK))
+ self.assertFalse(os.access(f, os.X_OK))
+ f = os.path.join(mount_point, 'simple.exe')
+ shutil.copy('/bin/bash', f)
+ self.assertTrue(os.access(f, os.R_OK))
+ self.assertTrue(os.access(f, os.W_OK))
+ self.assertTrue(os.access(f, os.X_OK))
+ os.mkdir(os.path.join(mount_point, 'subdir'))
+ f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
+ open(f, 'w').close()
+ self.assertTrue(os.access(f, os.R_OK))
+ self.assertTrue(os.access(f, os.W_OK))
+ self.assertFalse(os.access(f, os.X_OK))
+ f = os.path.join(mount_point, 'subdir', 'subdir.exe')
+ shutil.copy('/bin/bash', f)
+ self.assertTrue(os.access(f, os.R_OK))
+ self.assertTrue(os.access(f, os.W_OK))
+ self.assertTrue(os.access(f, os.X_OK))
+## ----------------------------------------------------------------------------
+class Smart(UDisksTestCase):
+ '''Check SMART operation.'''
+ def test_sda(self):
+ '''SMART status of first internal hard disk
+ This is a best-effort readonly test.
+ '''
+ hd = '/dev/sda'
+ if not os.path.exists(hd):
+ sys.stderr.write('[skip] ')
+ return
+ has_smart = subprocess.call(['skdump', '--can-smart', hd],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
+ block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
+ self.assertNotEqual(block, None)
+ drive = self.client.get_drive_for_block(block)
+ ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
+ self.assertEqual(ata != None, has_smart)
+ if has_smart:
+ sys.stderr.write('[avail] ')
+ self.assertEqual(ata.get_property('smart-supported'), True)
+ self.assertEqual(ata.get_property('smart-enabled'), True)
+ # wait for SMART data to be read
+ while ata.get_property('smart-updated') == 0:
+ sys.stderr.write('[wait for data] ')
+ time.sleep(0.5)
+ # this is of course not truly correct for a test suite, but let's
+ # consider it a courtesy for developers :-)
+ self.assertEqual(ata.get_property('smart-failing'), False)
+ self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
+ else:
+ sys.stderr.write('[N/A] ')
+# ----------------------------------------------------------------------------
+if __name__ == '__main__':
+ argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
+ argparser.add_argument('-l', '--log-file', dest='logfile',
+ help='write daemon log to a file')
+ argparser.add_argument('-w', '--no-workarounds',
+ action="store_true", default=False,
+ help='Disable workarounds for race conditions in the D-BUS API')
+ argparser.add_argument('testname', nargs='*',
+ help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
+ args = argparser.parse_args()
+ workaround_syncs = not args.no_workarounds
+ UDisksTestCase.init(logfile=args.logfile)
+ if args.testname:
+ tests = unittest.TestLoader().loadTestsFromNames(args.testname,
+ __import__('__main__'))
+ else:
+ tests = unittest.TestLoader().loadTestsFromName('__main__')
+ if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():
+ sys.exit(0)
+ else:
+ sys.exit(1)