klass.device = klass.setup_vdev()
+ # use a D-BUS config which permits root and nobody
+ klass.dbus_conf = tempfile.NamedTemporaryFile()
+ klass.dbus_conf.write(b'''<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN"
+ "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
+<busconfig>
+ <type>test</type>
+ <listen>unix:tmpdir=/tmp</listen>
+ <policy context="default">
+ <allow send_destination="*" eavesdrop="true"/>
+ <allow eavesdrop="true"/>
+ <allow own="*"/>
+ <allow user="root"/>
+ <allow user="nobody"/>
+ </policy>
+</busconfig>
+''')
+ klass.dbus_conf.flush()
+
# start polkit and udisks on a private DBus
- klass.dbus = Gio.TestDBus()
- klass.dbus.up()
- os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = klass.dbus.get_bus_address()
+ dbus = subprocess.Popen(['dbus-launch', '--config-file=' + klass.dbus_conf.name],
+ stdout=subprocess.PIPE, universal_newlines=True)
+ dbus_out = dbus.communicate()[0]
+ for l in dbus_out.splitlines():
+ k, v = l.split('=', 1)
+ if k == 'DBUS_SESSION_BUS_PID':
+ klass.dbus_pid = int(v)
+ if k == 'DBUS_SESSION_BUS_ADDRESS':
+ os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = v
# do not try to communicate with the current desktop session; this will
# confuse it, as it cannot see this D-BUS instance
try:
klass.teardown_vdev(klass.device)
klass.device = None
+ klass.dbus_conf.close()
del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
- klass.dbus.down()
+ os.kill(klass.dbus_pid, signal.SIGTERM)
@classmethod
def sync(klass):
self.assertEqual(block.get_property('id-type'), 'ext4')
self.assertEqual(block.get_property('id-label'), 'polkitext4')
+ def test_internal_fs_nobody(self):
+ '''Try to create FS on internal drive as nobody'''
+
+ # ensure we have a mountable file system
+ self.fs_create(None, 'ext4', no_options)
+
+ nobody = pwd.getpwnam('nobody')
+ assert nobody.pw_uid > 0
+
+ # we cannot just change euid and do the call, as polkit will remember
+ # our process which started out as root; so call the external tool
+ tool_mount = subprocess.Popen([self.tool_path, 'mount',
+ '--no-user-interaction', '-b', self.device],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True,
+ preexec_fn=lambda: (os.setgid(nobody.pw_gid), os.setuid(nobody.pw_uid)))
+ out, err = tool_mount.communicate()
+ self.assertTrue('Error.NotAuthorized' in err, err)
+
# ----------------------------------------------------------------------------
if __name__ == '__main__':