import signal
import optparse
import re
+from glob import glob
-NUM_VDEV = 3 # number of virtual test devices that we need
-VDEV_SIZE = 300000000 # size of virtual test devices
-test_md_dev = '/dev/md125'
+VDEV_SIZE = 300000000 # size of virtual test device
# Those file systems are known to have a broken handling of permissions, in
# particular the executable bit
print 'daemon path:', daemon_path
- klass.setup_vdevs()
-
- assert test_md_dev not in open('/proc/mdstat').read(), test_md_dev + 'is already in use'
- klass.device = test_md_dev
- assert subprocess.call(['mdadm', '--create', test_md_dev, '--force', '-n',
- '1', '-l', 'raid0', klass.test_vdev[0]]) == 0
-
- # start with a clean slate: zero out device
- klass.zero_device()
+ klass.device = klass.setup_vdev()
# inhibit GNOME automounting/nautilus pop ups
subprocess.call(['killall', '-STOP', 'gvfs-gdu-volume-monitor'])
def cleanup(klass):
'''stop daemon again and clean up test environment'''
- subprocess.call(['umount', test_md_dev], stderr=subprocess.PIPE) # if a test failed
- subprocess.call(['mdadm', '-S', test_md_dev])
- klass.device = None
+ subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
os.kill(klass.daemon.pid, signal.SIGTERM)
os.wait()
klass.daemon = None
- # release loop devices
- for vdev in klass.test_vdev:
- subprocess.call(['losetup', '-d', vdev])
+ klass.teardown_vdev(klass.device)
+ klass.device = None
# resume GNOME automounting/nautilus pop ups
subprocess.call(['killall', '-CONT', 'gvfs-gdu-volume-monitor'])
@classmethod
def zero_device(klass):
- subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device],
+ subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
stderr=subprocess.PIPE)
klass.sync()
'''Get name of test device or one of its partitions'''
if partition:
- return klass.device + 'p' + str(partition)
+ if klass.device[-1].isdigit():
+ return klass.device + 'p' + str(partition)
+ else:
+ return klass.device + str(partition)
else:
return klass.device
label_opt = { 'vfat': '-n',
'reiserfs': '-l',
}
- extra_opt = { 'vfat': [ '-F', '32'],
+ extra_opt = { 'vfat': [ '-I', '-F', '32'],
'swap': ['-f'],
- 'xfs': ['-f'], # XFS complains if there's an existing FS, so --force
+ 'xfs': ['-f'], # XFS complains if there's an existing FS, so force
+ 'ext2': ['-F'], # ext* complains about using entire device, so force
+ 'ext3': ['-F'],
+ 'ext4': ['-F'],
+ 'ntfs': ['-F'],
}
cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
sys.exit(0)
@classmethod
- def setup_vdevs(klass):
- '''create virtual test devices
+ def setup_vdev(klass):
+ '''create virtual test device
- This creates an array klass.test_vdev with NUM_VDEV temporary loop
- devices.
+ It is zeroed out initially.
+
+ Return the device path.
'''
- klass.test_vdev = []
- for i in range(NUM_VDEV):
- # find a free loop device
- losetup = subprocess.Popen(['losetup', '--find'],
- stdout=subprocess.PIPE)
- loopdev = losetup.communicate()[0].strip()
- assert losetup.returncode == 0, 'losetup failed to find a free loop device'
- klass.test_vdev.append(loopdev)
+ # ensure that the scsi_debug module is loaded
+ if os.path.isdir('/sys/module/scsi_debug'):
+ print >> sys.stderr, 'The scsi_debug module is already loaded; please remove before running this test.'
+ sys.exit(1)
- # create a backing store for it
- blob = tempfile.NamedTemporaryFile()
- blob.truncate(VDEV_SIZE)
+ assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
+ VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
- # set up loop device
- assert subprocess.call(['losetup', loopdev, blob.name]) == 0
+ # wait until all drives are created
+ dirs = []
+ while len(dirs) < 1:
+ dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+ time.sleep(0.1)
+ assert len(dirs) == 1
- # now we can close this, so that the file will disappear again and
- # the device can actually be written to
- blob.close()
+ # determine the debug block devices
+ devs = os.listdir(dirs[0])
+ assert len(devs) == 1
+ dev = '/dev/' + devs[0]
+ assert os.path.exists(dev)
- print 'Set up temporary loop devices:', ' '.join(klass.test_vdev)
+ # let's be 100% sure that we pick a virtual one
+ assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
+
+ print 'Set up test device:', dev
+ return dev
+
+ @classmethod
+ def teardown_vdev(klass, device):
+ '''release and remove virtual test device'''
+
+ # delete block devices
+ device = device.split('/')[-1]
+ f = open('/sys/block/%s/device/delete' % device, 'w')
+ f.write('1')
+ f.close()
+
+ klass.sync()
+
+ assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
+ 'Failure to rmmod scsi_debug'
# ----------------------------------------------------------------------------
shutil.rmtree (self.workdir)
def test_zero(self):
- '''properties of zeroed out md device'''
+ '''properties of zeroed out device'''
self.zero_device()
info = self.get_info()
self.assertEqual(info['presentation name'], '')
self.assertEqual(info['usage'], '')
self.assertEqual(info['type'], '')
- self.assertEqual(len(info['md_uuid']), 35)
self.assertEqual(info['uuid'], '')
self.assertEqual(info['label'], '')
'''Check partition operations.'''
def setUp(self):
- self.zero_device()
+ self.partition_iface().PartitionTableCreate('none', [])
self.assertEqual(self.get_partitions(), [])
info = self.get_info()
def setUp(self):
'''Create a VG "udtest".
- This uses two virtual disks as PV.
+ This uses two virtual disk partitions as PV.
'''
if subprocess.call(['which', 'pvcreate'], stdout=subprocess.PIPE) != 0:
self.fail('lvm tools not installed')
return
- self.assertEqual(subprocess.call(['pvcreate', self.test_vdev[1]],
+
+ partsize = VDEV_SIZE/2 * 95 / 100
+ self.partition_iface().PartitionTableCreate('mbr', [])
+ p1 = self.partition_iface().PartitionCreate(0, partsize,
+ '0x82', '', [], [], '', [])
+ p1 = self.partition_iface().PartitionCreate(partsize+1, partsize,
+ '0x82', '', [], [], '', [])
+
+ self.vgname = 'udtest'
+ self.assertEqual(subprocess.call(['pvcreate', self.devname(1)],
stdout=subprocess.PIPE), 0)
- self.assertEqual(subprocess.call(['pvcreate', self.test_vdev[2]],
+ self.assertEqual(subprocess.call(['pvcreate', self.devname(2)],
stdout=subprocess.PIPE), 0)
- self.vgname = 'udtest'
self.assertEqual(subprocess.call(['vgcreate', self.vgname,
- self.test_vdev[1], self.test_vdev[2]], stdout=subprocess.PIPE), 0)
+ self.devname(1), self.devname(2)], stdout=subprocess.PIPE), 0)
def tearDown(self):
'''Remove udtest VG.'''
self.assertEqual(p1_p.Get(I_D, 'PartitionType'), '0x0e')
self.assertEqual(p1_p.Get(I_D, 'PartitionSlave'), dev_objpath)
self.assertEqual(p1_p.Get(I_D, 'PartitionNumber'), 1)
- self.assert_(p1_p.Get(I_D, 'PartitionOffset') > 1000000)
+ self.assert_(p1_p.Get(I_D, 'PartitionOffset') > 10000)
self.assert_(p1_p.Get(I_D, 'PartitionSize') > 8000000)
# partition 2 properties
# ----------------------------------------------------------------------------
-class Loop(UDisksTestCase):
- '''Test loop device detection.'''
-
- def test_properties(self):
- '''Loop: presence and properties'''
-
- # our MD test partition is not a loop device
- p = self.partition_props()
- self.assertEqual(p.Get(I_D, 'DeviceIsLinuxLoop'), False)
- self.assertEqual(p.Get(I_D, 'LinuxLoopFilename'), '')
-
- for i in range(NUM_VDEV):
- vdev_p = dbus.Interface(dbus.SystemBus().get_object(
- 'org.freedesktop.UDisks',
- self.manager_iface.FindDeviceByDeviceFile(self.test_vdev[i])),
- dbus.PROPERTIES_IFACE)
- self.assertEqual(vdev_p.Get(I_D, 'DeviceIsLinuxLoop'), True)
- self.assert_(vdev_p.Get(I_D, 'LinuxLoopFilename').startswith('/tmp'))
-
-# ----------------------------------------------------------------------------
-
class GlobalOps(UDisksTestCase):
'''Check various global operations.'''