# Use of this source code is governed under the Apache License, Version 2.0 that
# can be found in the LICENSE file.
+import StringIO
import functools
import hashlib
import json
sys.path.insert(0, os.path.join(ROOT_DIR, 'third_party'))
import run_isolated
-
+import test_utils
from depot_tools import auto_stub
+ALGO = hashlib.sha1
+
def write_content(filepath, content):
with open(filepath, 'wb') as f:
f.write(content)
+def json_dumps(data):
+ return json.dumps(data, sort_keys=True, separators=(',', ':'))
+
+
class StorageFake(object):
def __init__(self, files):
self._files = files.copy()
shutil.rmtree(self.tempdir)
super(RunIsolatedTest, self).tearDown()
- def assertFileMode(self, filepath, mlinux, mosx, mwin):
- # Note that it depends on umask.
+ def assertFileMode(self, filepath, mode, umask=None):
+ umask = test_utils.umask() if umask is None else umask
actual = os.stat(filepath).st_mode
- if sys.platform == 'win32':
- expected = mwin
- elif sys.platform == 'darwin':
- expected = mosx
- else:
- expected = mlinux
- self.assertEqual(expected, actual, (filepath, oct(expected), oct(actual)))
+ expected = mode & ~umask
+ self.assertEqual(
+ expected,
+ actual,
+ (filepath, oct(expected), oct(actual), oct(umask)))
+
+ def assertMaskedFileMode(self, filepath, mode):
+ """It's usually when the file was first marked read only."""
+ self.assertFileMode(filepath, mode, 0 if sys.platform == 'win32' else 077)
@property
def run_test_temp_dir(self):
"""Shortcut for joining path with self.run_test_temp_dir."""
return os.path.join(self.run_test_temp_dir, *args)
- def test_umask(self):
- # Check assumptions about umask. Anyone can override umask so this test is
- # bound to be brittle. In practice if it fails, it means assertFileMode()
- # will have to be updated accordingly.
- umask = os.umask(0)
- os.umask(umask)
- if sys.platform == 'darwin':
- self.assertEqual(oct(022), oct(umask))
- elif sys.platform == 'win32':
- self.assertEqual(oct(0), oct(umask))
- else:
- self.assertEqual(oct(02), oct(umask))
-
def test_delete_wd_rf(self):
# Confirms that a RO file in a RW directory can be deleted on non-Windows.
dir_foo = os.path.join(self.tempdir, 'foo')
write_content(file_bar, 'bar')
run_isolated.set_read_only(dir_foo, False)
run_isolated.set_read_only(file_bar, True)
- self.assertFileMode(dir_foo, 040775, 040755, 040777)
- self.assertFileMode(file_bar, 0100400, 0100400, 0100444)
+ self.assertFileMode(dir_foo, 040777)
+ self.assertMaskedFileMode(file_bar, 0100444)
if sys.platform == 'win32':
# On Windows, a read-only file can't be deleted.
with self.assertRaises(OSError):
write_content(file_bar, 'bar')
run_isolated.set_read_only(dir_foo, True)
run_isolated.set_read_only(file_bar, False)
- self.assertFileMode(dir_foo, 040500, 040500, 040555)
- self.assertFileMode(file_bar, 0100664, 0100644, 0100666)
+ self.assertMaskedFileMode(dir_foo, 040555)
+ self.assertFileMode(file_bar, 0100666)
if sys.platform == 'win32':
# A read-only directory has a convoluted meaning on Windows, it means that
# the directory is "personalized". This is used as a signal by Windows
write_content(file_bar, 'bar')
run_isolated.set_read_only(dir_foo, True)
run_isolated.set_read_only(file_bar, True)
- self.assertFileMode(dir_foo, 040500, 040500, 040555)
- self.assertFileMode(file_bar, 0100400, 0100400, 0100444)
+ self.assertMaskedFileMode(dir_foo, 040555)
+ self.assertMaskedFileMode(file_bar, 0100444)
with self.assertRaises(OSError):
# It fails for different reason depending on the OS. See the test cases
# above.
os.mkdir(dir_foo, 0777)
write_content(file_bar, 'bar')
run_isolated.hardlink(file_bar, file_link)
- self.assertFileMode(file_bar, 0100664, 0100644, 0100666)
- self.assertFileMode(file_link, 0100664, 0100644, 0100666)
+ self.assertFileMode(file_bar, 0100666)
+ self.assertFileMode(file_link, 0100666)
run_isolated.set_read_only(file_bar, True)
- self.assertFileMode(file_bar, 0100400, 0100400, 0100444)
- self.assertFileMode(file_link, 0100400, 0100400, 0100444)
+ self.assertMaskedFileMode(file_bar, 0100444)
+ self.assertMaskedFileMode(file_link, 0100444)
# This is bad news for Windows; on Windows, the file must be writeable to be
# deleted, but the file node is modified. This means that every hard links
# must be reset to be read-only after deleting one of the hard link
calls.append(command)
return 0
self.mock(run_isolated.subprocess, 'call', call)
- isolated = json.dumps(
+ isolated = json_dumps(
{
'command': ['foo.exe', 'cmd with space'],
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
def get_storage(_isolate_server, _namespace):
return StorageFake({isolated_hash:isolated})
self.mock(run_isolated.isolateserver, 'get_storage', get_storage)
calls.append(command)
return 0
self.mock(run_isolated.subprocess, 'call', call)
- isolated = json.dumps(
+ isolated = json_dumps(
{
'command': ['foo.exe', 'cmd with space'],
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
def get_storage(_isolate_server, _namespace):
return StorageFake({isolated_hash:isolated})
self.mock(run_isolated.isolateserver, 'get_storage', get_storage)
run_isolated.subprocess, 'call',
lambda *x, **y: subprocess_call.append((x, y)) or 0)
- outdir = self.fake_make_temp_dir()
- try:
- ret = run_isolated.run_tha_test(
- isolated_hash,
- StorageFake(files),
- run_isolated.isolateserver.MemoryCache(),
- run_isolated.isolateserver.get_hash_algo('default-deflate'),
- outdir,
- [])
- finally:
- if os.path.isdir(outdir):
- run_isolated.rmtree(outdir)
- self.fail('Temporary directory should have been deleted')
+ ret = run_isolated.run_tha_test(
+ isolated_hash,
+ StorageFake(files),
+ run_isolated.isolateserver.MemoryCache(),
+ run_isolated.isolateserver.get_hash_algo('default-deflate'),
+ [])
self.assertEqual(0, ret)
return subprocess_call, make_tree_call
def test_run_tha_test_naked(self):
- isolated = json.dumps({'command': ['invalid', 'command']})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated = json_dumps({'command': ['invalid', 'command']})
+ isolated_hash = ALGO(isolated).hexdigest()
files = {isolated_hash:isolated}
subprocess_call, make_tree_call = self._run_tha_test(isolated_hash, files)
self.assertEqual(
- ['make_tree_writeable', 'make_tree_deleteable'], make_tree_call)
+ ['make_tree_writeable', 'make_tree_deleteable', 'make_tree_deleteable'],
+ make_tree_call)
self.assertEqual(1, len(subprocess_call))
self.assertTrue(subprocess_call[0][1].pop('cwd'))
self.assertTrue(subprocess_call[0][1].pop('env'))
[(([self.temp_join(u'invalid'), u'command'],), {})], subprocess_call)
def test_run_tha_test_naked_read_only_0(self):
- isolated = json.dumps(
+ isolated = json_dumps(
{
'command': ['invalid', 'command'],
'read_only': 0,
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
files = {isolated_hash:isolated}
subprocess_call, make_tree_call = self._run_tha_test(isolated_hash, files)
self.assertEqual(
- ['make_tree_writeable', 'make_tree_deleteable'], make_tree_call)
+ ['make_tree_writeable', 'make_tree_deleteable', 'make_tree_deleteable'],
+ make_tree_call)
self.assertEqual(1, len(subprocess_call))
self.assertTrue(subprocess_call[0][1].pop('cwd'))
self.assertTrue(subprocess_call[0][1].pop('env'))
[(([self.temp_join(u'invalid'), u'command'],), {})], subprocess_call)
def test_run_tha_test_naked_read_only_1(self):
- isolated = json.dumps(
+ isolated = json_dumps(
{
'command': ['invalid', 'command'],
'read_only': 1,
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
files = {isolated_hash:isolated}
subprocess_call, make_tree_call = self._run_tha_test(isolated_hash, files)
self.assertEqual(
- ['make_tree_files_read_only', 'make_tree_deleteable'], make_tree_call)
+ [
+ 'make_tree_files_read_only', 'make_tree_deleteable',
+ 'make_tree_deleteable',
+ ],
+ make_tree_call)
self.assertEqual(1, len(subprocess_call))
self.assertTrue(subprocess_call[0][1].pop('cwd'))
self.assertTrue(subprocess_call[0][1].pop('env'))
[(([self.temp_join(u'invalid'), u'command'],), {})], subprocess_call)
def test_run_tha_test_naked_read_only_2(self):
- isolated = json.dumps(
+ isolated = json_dumps(
{
'command': ['invalid', 'command'],
'read_only': 2,
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
files = {isolated_hash:isolated}
subprocess_call, make_tree_call = self._run_tha_test(isolated_hash, files)
self.assertEqual(
- ['make_tree_read_only', 'make_tree_deleteable'], make_tree_call)
+ ['make_tree_read_only', 'make_tree_deleteable', 'make_tree_deleteable'],
+ make_tree_call)
self.assertEqual(1, len(subprocess_call))
self.assertTrue(subprocess_call[0][1].pop('cwd'))
self.assertTrue(subprocess_call[0][1].pop('env'))
def test_main_naked(self):
# The most naked .isolated file that can exist.
self.mock(run_isolated.tools, 'disable_buffering', lambda: None)
- isolated = json.dumps({'command': ['invalid', 'command']})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated = json_dumps({'command': ['invalid', 'command']})
+ isolated_hash = ALGO(isolated).hexdigest()
def get_storage(_isolate_server, _namespace):
return StorageFake({isolated_hash:isolated})
self.mock(run_isolated.isolateserver, 'get_storage', get_storage)
[(([self.temp_join(u'invalid'), u'command'],), {})], subprocess_call)
def test_modified_cwd(self):
- isolated = json.dumps({
+ isolated = json_dumps({
'command': ['../out/some.exe', 'arg'],
'relative_cwd': 'some',
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
files = {isolated_hash:isolated}
subprocess_call, _ = self._run_tha_test(isolated_hash, files)
self.assertEqual(1, len(subprocess_call))
subprocess_call)
def test_python_cmd(self):
- isolated = json.dumps({
+ isolated = json_dumps({
'command': ['../out/cmd.py', 'arg'],
'relative_cwd': 'some',
})
- isolated_hash = hashlib.sha1(isolated).hexdigest()
+ isolated_hash = ALGO(isolated).hexdigest()
files = {isolated_hash:isolated}
subprocess_call, _ = self._run_tha_test(isolated_hash, files)
self.assertEqual(1, len(subprocess_call))
[(([sys.executable, os.path.join('..', 'out', 'cmd.py'), 'arg'],), {})],
subprocess_call)
+ def test_output(self):
+ script = (
+ 'import sys\n'
+ 'open(sys.argv[1], "w").write("bar")\n')
+ script_hash = ALGO(script).hexdigest()
+ isolated = json_dumps(
+ {
+ 'algo': 'sha-1',
+ 'command': ['cmd.py', '${ISOLATED_OUTDIR}/foo'],
+ 'files': {
+ 'cmd.py': {
+ 'h': script_hash,
+ 'm': 0700,
+ 's': len(script),
+ },
+ },
+ 'version': run_isolated.isolateserver.ISOLATED_FILE_VERSION,
+ })
+ isolated_hash = ALGO(isolated).hexdigest()
+ contents = {
+ isolated_hash: isolated,
+ script_hash: script,
+ }
+
+ path = os.path.join(self.tempdir, 'store')
+ os.mkdir(path)
+ for h, c in contents.iteritems():
+ write_content(os.path.join(path, h), c)
+ store = run_isolated.isolateserver.get_storage(path, 'default-store')
+
+ self.mock(sys, 'stdout', StringIO.StringIO())
+ ret = run_isolated.run_tha_test(
+ isolated_hash,
+ store,
+ run_isolated.isolateserver.MemoryCache(),
+ run_isolated.isolateserver.get_hash_algo('default-store'),
+ [])
+ self.assertEqual(0, ret)
+
+ # It uploaded back. Assert the store has a new item containing foo.
+ hashes = set(contents)
+ output_hash = ALGO('bar').hexdigest()
+ hashes.add(output_hash)
+ uploaded = json_dumps(
+ {
+ 'algo': 'sha-1',
+ 'files': {
+ 'foo': {
+ 'h': output_hash,
+ # TODO(maruel): Handle umask.
+ 'm': 0640,
+ 's': 3,
+ },
+ },
+ 'version': run_isolated.isolateserver.ISOLATED_FILE_VERSION,
+ })
+ uploaded_hash = ALGO(uploaded).hexdigest()
+ hashes.add(uploaded_hash)
+ self.assertEqual(hashes, set(os.listdir(path)))
+ self.assertEqual(
+ 'run_isolated output: %s\n' % uploaded_hash, sys.stdout.getvalue())
+
if __name__ == '__main__':
logging.basicConfig(