integration-test: Create a read-only fake CD-ROM device
authorMartin Pitt <martinpitt@gnome.org>
Tue, 4 Sep 2012 14:05:59 +0000 (16:05 +0200)
committerMartin Pitt <martinpitt@gnome.org>
Tue, 4 Sep 2012 15:33:05 +0000 (17:33 +0200)
We will need this for further tests, e. g. to reproduce
https://bugs.freedesktop.org/show_bug.cgi?id=53237 or to check required
privileges for a removable device.

scsi_debug is not capable enough to emulate the whole SCSI CD-ROM command set,
thus we need a temporary udev rule (in /run/udev/rules.d/) to ensure that
blkid runs on scsi_debug CD devices.

src/tests/integration-test

index 22bee38..96f5955 100755 (executable)
@@ -119,7 +119,7 @@ class UDisksTestCase(unittest.TestCase):
 
         print('daemon path: ' + daemon_path)
 
-        klass.device = klass.setup_vdev()
+        (klass.device, klass.cd_device) = klass.setup_vdev()
 
         # start polkit and udisks on a private DBus
         klass.dbus = Gio.TestDBus()
@@ -212,35 +212,45 @@ class UDisksTestCase(unittest.TestCase):
         klass.sync()
 
     @classmethod
-    def devname(klass, partition=None):
-        '''Get name of test device or one of its partitions'''
+    def devname(klass, partition=None, cd=False):
+        '''Get name of test device or one of its partitions
 
+        If cd is True, return the CD device, otherwise the hard disk device.
+        '''
+        if cd:
+            dev = klass.cd_device
+        else:
+            dev = klass.device
         if partition:
-            if klass.device[-1].isdigit():
-                return klass.device + 'p' + str(partition)
+            if dev[-1].isdigit():
+                return dev + 'p' + str(partition)
             else:
-                return klass.device + str(partition)
+                return dev + str(partition)
         else:
-            return klass.device
+            return dev
 
     @classmethod
-    def udisks_block(klass, partition=None):
-        '''Get UDisksBlock object for test device or partition'''
+    def udisks_block(klass, partition=None, cd=False):
+        '''Get UDisksBlock object for test device or partition
 
+        If cd is True, return the CD device, otherwise the hard disk device.
+        '''
         assert klass.client
-        devname = klass.devname(partition)
+        devname = klass.devname(partition, cd)
         dev_t = os.stat(devname).st_rdev
         block = klass.client.get_block_for_dev(dev_t)
         assert block, 'did not find an UDisksBlock object for %s' % devname
         return block
 
     @classmethod
-    def udisks_filesystem(klass, partition=None):
+    def udisks_filesystem(klass, partition=None, cd=False):
         '''Get UDisksFilesystem object for test device or partition
         
         Return None if there is no file system on that device.
+
+        If cd is True, return the CD device, otherwise the hard disk device.
         '''
-        block = klass.udisks_block(partition)
+        block = klass.udisks_block(partition, cd)
         return klass.client.get_object(block.get_object_path()).get_property('filesystem')
 
     @classmethod
@@ -364,38 +374,70 @@ class UDisksTestCase(unittest.TestCase):
         
     @classmethod
     def setup_vdev(klass):
-        '''create virtual test device
+        '''create virtual test devices
         
         It is zeroed out initially.
 
-        Return the device path.
+        Return a pair (writable HD device path, readonly CD device path).
         '''
         # ensure that the scsi_debug module is loaded
         if os.path.isdir('/sys/module/scsi_debug'):
             sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
             sys.exit(1)
 
+        # work around scsi_debug not implementing CD-ROM SCSI commands, so that
+        # udev's cdrom_id does not recognize tracks
+        scsi_debug_rules = '/run/udev/rules.d/60-persistent-storage-scsi_debug.rules'
+        if os.path.isdir('/run/udev/rules.d') and not os.path.exists(scsi_debug_rules):
+            with open(scsi_debug_rules, 'w') as f:
+                f.write('''KERNEL=="sr*", ENV{DISK_EJECT_REQUEST}!="?*", ATTRS{model}=="scsi_debug*", ENV{ID_CDROM_MEDIA}=="?*", IMPORT{program}="/sbin/blkid -o udev -p -u noraid $tempnode"
+''')
+            # reload udev
+            subprocess.call('sync; pkill --signal HUP udevd || pkill --signal HUP systemd-udevd',
+                            shell=True)
+
+        # craete a fake SCSI hard drive
         assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
             VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
 
-        # wait until all drives are created
-        dirs = []
-        while len(dirs) < 1:
-            dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+        # wait until drive got created
+        rw_dirs = []
+        while len(rw_dirs) < 1:
+            rw_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
             time.sleep(0.1)
-        assert len(dirs) == 1
+        assert len(rw_dirs) == 1
+
+        # create a fake CD-ROM, too
+        with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
+            f.write('5')  # henceforth, created devices will be CD drives
+        with open('/sys/bus/pseudo/drivers/scsi_debug/add_host', 'w') as f:
+            f.write('1')  # generate a new drive
+        subprocess.call(['udevadm', 'settle'])
+        with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
+            f.write('0')
+
+        ro_dirs = []
+        while len(ro_dirs) < 2:
+            ro_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+            time.sleep(0.1)
+        ro_dirs.remove(rw_dirs[0])
+        assert len(ro_dirs) == 1
 
         # determine the debug block devices
-        devs = os.listdir(dirs[0])
+        devs = os.listdir(ro_dirs[0])
+        assert len(devs) == 1
+        ro_dev = '/dev/' + devs[0]
+        devs = os.listdir(rw_dirs[0])
         assert len(devs) == 1
-        dev = '/dev/' + devs[0]
-        assert os.path.exists(dev)
+        rw_dev = '/dev/' + devs[0]
+        assert os.path.exists(ro_dev)
+        assert os.path.exists(rw_dev)
 
         # let's be 100% sure that we pick a virtual one
-        assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
+        assert open('/sys/block/%s/device/model' % os.path.basename(rw_dev)).read().strip() == 'scsi_debug'
 
-        print('Set up test device: ' + dev)
-        return dev
+        print('Set up test device: r/w: %s, r/o: %s' % (rw_dev, ro_dev))
+        return (rw_dev, ro_dev)
 
     @classmethod
     def teardown_vdev(klass, device):