import StringIO
import sys
import tempfile
-import threading
import unittest
import urllib
import zlib
-TEST_DIR = os.path.dirname(os.path.abspath(__file__))
-ROOT_DIR = os.path.dirname(TEST_DIR)
-sys.path.insert(0, ROOT_DIR)
-sys.path.insert(0, os.path.join(ROOT_DIR, 'third_party'))
+# net_utils adjusts sys.path.
+import net_utils
from depot_tools import auto_stub
+import isolated_format
import isolateserver
import test_utils
from utils import threading_utils
ALGO = hashlib.sha1
-# Tests here assume ALGO is used for default namespaces, check this assumption.
-assert isolateserver.get_hash_algo('default') is ALGO
-assert isolateserver.get_hash_algo('default-gzip') is ALGO
-
-class TestCase(auto_stub.TestCase):
+class TestCase(net_utils.TestCase):
"""Mocks out url_open() calls and sys.stdout/stderr."""
def setUp(self):
super(TestCase, self).setUp()
self.mock(isolateserver.auth, 'ensure_logged_in', lambda _: None)
- self.mock(isolateserver.net, 'url_open', self._url_open)
- self.mock(isolateserver.net, 'sleep_before_retry', lambda *_: None)
- self._lock = threading.Lock()
- self._requests = []
self.mock(sys, 'stdout', StringIO.StringIO())
self.mock(sys, 'stderr', StringIO.StringIO())
def tearDown(self):
try:
- self.assertEqual([], self._requests)
- self.checkOutput('', '')
+ if not self.has_failed():
+ self.checkOutput('', '')
finally:
super(TestCase, self).tearDown()
self.mock(sys, 'stdout', StringIO.StringIO())
self.mock(sys, 'stderr', StringIO.StringIO())
- def _url_open(self, url, **kwargs):
- logging.warn('url_open(%s, %s)', url[:500], str(kwargs)[:500])
- with self._lock:
- if not self._requests:
- return None
- # Ignore 'stream' argument, it's not important for these tests.
- kwargs.pop('stream', None)
- for i, n in enumerate(self._requests):
- if n[0] == url:
- _, expected_kwargs, result, headers = self._requests.pop(i)
- self.assertEqual(expected_kwargs, kwargs)
- if result is not None:
- return isolateserver.net.HttpResponse.get_fake_response(
- result, url, headers)
- return None
- self.fail('Unknown request %s' % url)
-
class TestZipCompression(TestCase):
"""Test zip_compress and zip_decompress generators."""
with self.assertRaises(IOError):
channel.pull()
# First initial attempt + all retries.
- attempts = 1 + isolateserver.WorkerPool.RETRIES
+ attempts = 1 + storage.net_thread_pool.RETRIES
# Single push attempt call arguments.
expected_push = (
item, 'push_state', item.zipped if use_zip else item.data)
}
return (
server + '/content-gs/handshake',
- {
- 'content_type': 'application/json',
- 'method': 'POST',
- 'data': json.dumps(handshake_request, separators=(',', ':')),
- },
- json.dumps(handshake_response),
- None,
+ {'data': handshake_request},
+ handshake_response,
)
@staticmethod
def mock_contains_request(server, namespace, token, request, response):
url = server + '/content-gs/pre-upload/%s?token=%s' % (
namespace, urllib.quote(token))
- return (
- url,
- {
- 'data': json.dumps(request, separators=(',', ':')),
- 'content_type': 'application/json',
- 'method': 'POST',
- },
- json.dumps(response),
- None,
- )
+ return (url, {'data': request}, response)
def test_server_capabilities_success(self):
server = 'http://example.com'
namespace = 'default'
access_token = 'fake token'
- self._requests = [
- self.mock_handshake_request(server, access_token),
- ]
+ self.expected_requests([self.mock_handshake_request(server, access_token)])
storage = isolateserver.IsolateServer(server, namespace)
caps = storage._server_capabilities
self.assertEqual(access_token, caps['access_token'])
def test_server_capabilities_network_failure(self):
self.mock(isolateserver.net, 'url_open', lambda *_args, **_kwargs: None)
- with self.assertRaises(isolateserver.MappingError):
+ with self.assertRaises(isolated_format.MappingError):
storage = isolateserver.IsolateServer('http://example.com', 'default')
_ = storage._server_capabilities
server = 'http://example.com'
namespace = 'default'
handshake_req = self.mock_handshake_request(server)
- self._requests = [
- (handshake_req[0], handshake_req[1], 'Im a bad response', None),
- ]
+ self.expected_requests(
+ [(handshake_req[0], handshake_req[1], 'Im a bad response')])
storage = isolateserver.IsolateServer(server, namespace)
- with self.assertRaises(isolateserver.MappingError):
+ with self.assertRaises(isolated_format.MappingError):
_ = storage._server_capabilities
def test_server_capabilities_respects_error(self):
server = 'http://example.com'
namespace = 'default'
error = 'Im sorry, Dave. Im afraid I cant do that.'
- self._requests = [
- self.mock_handshake_request(server, error=error)
- ]
+ self.expected_requests([self.mock_handshake_request(server, error=error)])
storage = isolateserver.IsolateServer(server, namespace)
- with self.assertRaises(isolateserver.MappingError) as context:
+ with self.assertRaises(isolated_format.MappingError) as context:
_ = storage._server_capabilities
# Server error message should be reported to user.
self.assertIn(error, str(context.exception))
namespace = 'default'
data = ''.join(str(x) for x in xrange(1000))
item = ALGO(data).hexdigest()
- self._requests = [
- self.mock_fetch_request(server, namespace, item, data),
- ]
+ self.expected_requests(
+ [self.mock_fetch_request(server, namespace, item, data)])
storage = isolateserver.IsolateServer(server, namespace)
fetched = ''.join(storage.fetch(item))
self.assertEqual(data, fetched)
server = 'http://example.com'
namespace = 'default'
item = ALGO('something').hexdigest()
- self._requests = [
- self.mock_fetch_request(server, namespace, item, None),
- ]
+ self.expected_requests(
+ [self.mock_fetch_request(server, namespace, item, None)])
storage = isolateserver.IsolateServer(server, namespace)
with self.assertRaises(IOError):
_ = ''.join(storage.fetch(item))
]
for content_range_header in good_content_range_headers:
- self._requests = [
- self.mock_fetch_request(
- server, namespace, item, data[offset:],
- request_headers={'Range': 'bytes=%d-' % offset},
- response_headers={'Content-Range': content_range_header}),
- ]
+ self.expected_requests(
+ [
+ self.mock_fetch_request(
+ server, namespace, item, data[offset:],
+ request_headers={'Range': 'bytes=%d-' % offset},
+ response_headers={'Content-Range': content_range_header}),
+ ])
storage = isolateserver.IsolateServer(server, namespace)
fetched = ''.join(storage.fetch(item, offset))
self.assertEqual(data[offset:], fetched)
]
for content_range_header in bad_content_range_headers:
- self._requests = [
- self.mock_fetch_request(
- server, namespace, item, data[offset:],
- request_headers={'Range': 'bytes=%d-' % offset},
- response_headers={'Content-Range': content_range_header}),
- ]
+ self.expected_requests(
+ [
+ self.mock_fetch_request(
+ server, namespace, item, data[offset:],
+ request_headers={'Range': 'bytes=%d-' % offset},
+ response_headers={'Content-Range': content_range_header}),
+ ])
storage = isolateserver.IsolateServer(server, namespace)
with self.assertRaises(IOError):
_ = ''.join(storage.fetch(item, offset))
-
def test_push_success(self):
server = 'http://example.com'
namespace = 'default'
push_urls = (server + '/push_here', server + '/call_this')
contains_request = [{'h': item.digest, 's': item.size, 'i': 0}]
contains_response = [push_urls]
- self._requests = [
+ requests = [
self.mock_handshake_request(server, token),
self.mock_contains_request(
server, namespace, token, contains_request, contains_response),
),
(
push_urls[1],
- {
- 'data': '',
- 'content_type': 'application/json',
- 'method': 'POST',
- },
+ {'content_type': 'application/json', 'data': '', 'method': 'POST'},
'',
None,
),
]
+ self.expected_requests(requests)
storage = isolateserver.IsolateServer(server, namespace)
missing = storage.contains([item])
self.assertEqual([item], missing.keys())
push_urls = (server + '/push_here', server + '/call_this')
contains_request = [{'h': item.digest, 's': item.size, 'i': 0}]
contains_response = [push_urls]
- self._requests = [
+ requests = [
self.mock_handshake_request(server, token),
self.mock_contains_request(
server, namespace, token, contains_request, contains_response),
None,
),
]
+ self.expected_requests(requests)
storage = isolateserver.IsolateServer(server, namespace)
missing = storage.contains([item])
self.assertEqual([item], missing.keys())
push_urls = (server + '/push_here', server + '/call_this')
contains_request = [{'h': item.digest, 's': item.size, 'i': 0}]
contains_response = [push_urls]
- self._requests = [
+ requests = [
self.mock_handshake_request(server, token),
self.mock_contains_request(
server, namespace, token, contains_request, contains_response),
),
(
push_urls[1],
- {
- 'data': '',
- 'content_type': 'application/json',
- 'method': 'POST',
- },
+ {'content_type': 'application/json', 'data': '', 'method': 'POST'},
None,
None,
),
]
+ self.expected_requests(requests)
storage = isolateserver.IsolateServer(server, namespace)
missing = storage.contains([item])
self.assertEqual([item], missing.keys())
namespace = 'default'
token = 'fake token'
req = self.mock_contains_request(server, namespace, token, [], [])
- self._requests = [
- self.mock_handshake_request(server, token),
- (req[0], req[1], None, None),
- ]
+ self.expected_requests(
+ [
+ self.mock_handshake_request(server, token),
+ (req[0], req[1], None),
+ ])
storage = isolateserver.IsolateServer(server, namespace)
- with self.assertRaises(isolateserver.MappingError):
+ with self.assertRaises(isolated_format.MappingError):
storage.contains([])
def test_contains_format_failure(self):
server = 'http://example.com'
namespace = 'default'
token = 'fake token'
- self._requests = [
- self.mock_handshake_request(server, token),
- self.mock_contains_request(server, namespace, token, [], [1, 2, 3])
- ]
+ self.expected_requests(
+ [
+ self.mock_handshake_request(server, token),
+ self.mock_contains_request(server, namespace, token, [], [1, 2, 3]),
+ ])
storage = isolateserver.IsolateServer(server, namespace)
- with self.assertRaises(isolateserver.MappingError):
+ with self.assertRaises(isolated_format.MappingError):
storage.contains([])
actual[key] = ''.join(generator)
self.mock(isolateserver, 'file_write', out)
server = 'http://example.com'
- self._requests = [
+ requests = [
(
server + '/content-gs/retrieve/default-gzip/sha-1',
{'read_timeout': 60, 'headers': None},
None,
),
]
+ self.expected_requests(requests)
cmd = [
'download',
'--isolate-server', server,
- '--target', ROOT_DIR,
+ '--target', net_utils.ROOT_DIR,
'--file', 'sha-1', 'path/to/a',
'--file', 'sha-2', 'path/to/b',
]
self.assertEqual(0, isolateserver.main(cmd))
expected = {
- os.path.join(ROOT_DIR, 'path/to/a'): 'Coucou',
- os.path.join(ROOT_DIR, 'path/to/b'): 'Bye Bye',
+ os.path.join(net_utils.ROOT_DIR, 'path/to/a'): 'Coucou',
+ os.path.join(net_utils.ROOT_DIR, 'path/to/b'): 'Bye Bye',
}
self.assertEqual(expected, actual)
'files': dict(
(k, {'h': ALGO(v).hexdigest(), 's': len(v)})
for k, v in files.iteritems()),
- 'version': isolateserver.ISOLATED_FILE_VERSION,
+ 'version': isolated_format.ISOLATED_FILE_VERSION,
}
isolated_data = json.dumps(isolated, sort_keys=True, separators=(',',':'))
isolated_hash = ALGO(isolated_data).hexdigest()
requests = [(v['h'], files[k]) for k, v in isolated['files'].iteritems()]
requests.append((isolated_hash, isolated_data))
- self._requests = [
+ requests = [
(
server + '/content-gs/retrieve/default-gzip/' + h,
{
'--target', self.tempdir,
'--isolated', isolated_hash,
]
+ self.expected_requests(requests)
self.assertEqual(0, isolateserver.main(cmd))
expected = dict(
(os.path.join(self.tempdir, k), v) for k, v in files.iteritems())
class TestIsolated(auto_stub.TestCase):
def test_load_isolated_empty(self):
- m = isolateserver.load_isolated('{}', ALGO)
+ m = isolated_format.load_isolated('{}', ALGO)
self.assertEqual({}, m)
def test_load_isolated_good(self):
u'includes': [u'0123456789abcdef0123456789abcdef01234567'],
u'read_only': 1,
u'relative_cwd': u'somewhere_else',
- u'version': isolateserver.ISOLATED_FILE_VERSION,
+ u'version': isolated_format.ISOLATED_FILE_VERSION,
}
- m = isolateserver.load_isolated(json.dumps(data), ALGO)
+ m = isolated_format.load_isolated(json.dumps(data), ALGO)
self.assertEqual(data, m)
def test_load_isolated_bad(self):
u'h': u'0123456789abcdef0123456789abcdef01234567'
}
},
- u'version': isolateserver.ISOLATED_FILE_VERSION,
+ u'version': isolated_format.ISOLATED_FILE_VERSION,
}
- try:
- isolateserver.load_isolated(json.dumps(data), ALGO)
- self.fail()
- except isolateserver.ConfigError:
- pass
+ with self.assertRaises(isolated_format.IsolatedError):
+ isolated_format.load_isolated(json.dumps(data), ALGO)
def test_load_isolated_os_only(self):
# Tolerate 'os' on older version.
u'os': 'HP/UX',
u'version': '1.3',
}
- m = isolateserver.load_isolated(json.dumps(data), ALGO)
+ m = isolated_format.load_isolated(json.dumps(data), ALGO)
self.assertEqual(data, m)
def test_load_isolated_os_only_bad(self):
data = {
u'os': 'HP/UX',
- u'version': isolateserver.ISOLATED_FILE_VERSION,
+ u'version': isolated_format.ISOLATED_FILE_VERSION,
}
- with self.assertRaises(isolateserver.ConfigError):
- isolateserver.load_isolated(json.dumps(data), ALGO)
+ with self.assertRaises(isolated_format.IsolatedError):
+ isolated_format.load_isolated(json.dumps(data), ALGO)
def test_load_isolated_path(self):
# Automatically convert the path case.
},
},
u'relative_cwd': path_sep.join(('somewhere', 'else')),
- u'version': isolateserver.ISOLATED_FILE_VERSION,
+ u'version': isolated_format.ISOLATED_FILE_VERSION,
}
data = gen_data(wrong_path_sep)
- actual = isolateserver.load_isolated(json.dumps(data), ALGO)
+ actual = isolated_format.load_isolated(json.dumps(data), ALGO)
expected = gen_data(os.path.sep)
self.assertEqual(expected, actual)
}
},
}
- m = isolateserver.save_isolated('foo', data)
+ m = isolated_format.save_isolated('foo', data)
self.assertEqual([], m)
self.assertEqual([('foo', data, True)], calls)
-class SymlinkTest(unittest.TestCase):
- def setUp(self):
- super(SymlinkTest, self).setUp()
- self.old_cwd = os.getcwd()
- self.cwd = tempfile.mkdtemp(prefix='isolate_')
- # Everything should work even from another directory.
- os.chdir(self.cwd)
-
- def tearDown(self):
- try:
- os.chdir(self.old_cwd)
- shutil.rmtree(self.cwd)
- finally:
- super(SymlinkTest, self).tearDown()
-
- if sys.platform == 'darwin':
- def test_expand_symlinks_path_case(self):
- # Ensures that the resulting path case is fixed on case insensitive file
- # system.
- os.symlink('dest', os.path.join(self.cwd, 'link'))
- os.mkdir(os.path.join(self.cwd, 'Dest'))
- open(os.path.join(self.cwd, 'Dest', 'file.txt'), 'w').close()
-
- result = isolateserver.expand_symlinks(unicode(self.cwd), 'link')
- self.assertEqual((u'Dest', [u'link']), result)
- result = isolateserver.expand_symlinks(unicode(self.cwd), 'link/File.txt')
- self.assertEqual((u'Dest/file.txt', [u'link']), result)
-
- def test_expand_directories_and_symlinks_path_case(self):
- # Ensures that the resulting path case is fixed on case insensitive file
- # system. A superset of test_expand_symlinks_path_case.
- # Create *all* the paths with the wrong path case.
- basedir = os.path.join(self.cwd, 'baseDir')
- os.mkdir(basedir.lower())
- subdir = os.path.join(basedir, 'subDir')
- os.mkdir(subdir.lower())
- open(os.path.join(subdir, 'Foo.txt'), 'w').close()
- os.symlink('subDir', os.path.join(basedir, 'linkdir'))
- actual = isolateserver.expand_directories_and_symlinks(
- unicode(self.cwd), [u'baseDir/'], lambda _: None, True, False)
- expected = [
- u'basedir/linkdir',
- u'basedir/subdir/Foo.txt',
- u'basedir/subdir/Foo.txt',
- ]
- self.assertEqual(expected, actual)
-
- def test_process_input_path_case_simple(self):
- # Ensure the symlink dest is saved in the right path case.
- subdir = os.path.join(self.cwd, 'subdir')
- os.mkdir(subdir)
- linkdir = os.path.join(self.cwd, 'linkdir')
- os.symlink('subDir', linkdir)
- actual = isolateserver.process_input(
- unicode(linkdir.upper()), {}, True, ALGO)
- expected = {'l': u'subdir', 'm': 360, 't': int(os.stat(linkdir).st_mtime)}
- self.assertEqual(expected, actual)
-
- def test_process_input_path_case_complex(self):
- # Ensure the symlink dest is saved in the right path case. This includes 2
- # layers of symlinks.
- basedir = os.path.join(self.cwd, 'basebir')
- os.mkdir(basedir)
-
- linkeddir2 = os.path.join(self.cwd, 'linkeddir2')
- os.mkdir(linkeddir2)
-
- linkeddir1 = os.path.join(basedir, 'linkeddir1')
- os.symlink('../linkedDir2', linkeddir1)
-
- subsymlinkdir = os.path.join(basedir, 'symlinkdir')
- os.symlink('linkedDir1', subsymlinkdir)
-
- actual = isolateserver.process_input(
- unicode(subsymlinkdir.upper()), {}, True, ALGO)
- expected = {
- 'l': u'linkeddir1', 'm': 360, 't': int(os.stat(subsymlinkdir).st_mtime),
- }
- self.assertEqual(expected, actual)
-
- actual = isolateserver.process_input(
- unicode(linkeddir1.upper()), {}, True, ALGO)
- expected = {
- 'l': u'../linkeddir2', 'm': 360, 't': int(os.stat(linkeddir1).st_mtime),
- }
- self.assertEqual(expected, actual)
-
- if sys.platform != 'win32':
- def test_symlink_input_absolute_path(self):
- # A symlink is outside of the checkout, it should be treated as a normal
- # directory.
- # .../src
- # .../src/out -> .../tmp/foo
- # .../tmp
- # .../tmp/foo
- src = os.path.join(self.cwd, u'src')
- src_out = os.path.join(src, 'out')
- tmp = os.path.join(self.cwd, 'tmp')
- tmp_foo = os.path.join(tmp, 'foo')
- os.mkdir(src)
- os.mkdir(tmp)
- os.mkdir(tmp_foo)
- # The problem was that it's an absolute path, so it must be considered a
- # normal directory.
- os.symlink(tmp, src_out)
- open(os.path.join(tmp_foo, 'bar.txt'), 'w').close()
- actual = isolateserver.expand_symlinks(src, u'out/foo/bar.txt')
- self.assertEqual((u'out/foo/bar.txt', []), actual)
-
-
def get_storage(_isolate_server, namespace):
class StorageFake(object):
def __enter__(self, *_):
@property
def hash_algo(self): # pylint: disable=R0201
- return isolateserver.get_hash_algo(namespace)
+ return isolated_format.get_hash_algo(namespace)
@staticmethod
def upload_items(items):
def test_archive_files(self):
old_cwd = os.getcwd()
try:
- os.chdir(os.path.join(TEST_DIR, 'isolateserver'))
+ os.chdir(os.path.join(net_utils.TEST_DIR, 'isolateserver'))
self.mock(isolateserver, 'get_storage', get_storage)
f = ['empty_file.txt', 'small_file.txt']
isolateserver.main(
def help_test_archive(self, cmd_line_prefix):
old_cwd = os.getcwd()
try:
- os.chdir(ROOT_DIR)
+ os.chdir(net_utils.ROOT_DIR)
self.mock(isolateserver, 'get_storage', get_storage)
- p = os.path.join(TEST_DIR, 'isolateserver')
+ p = os.path.join(net_utils.TEST_DIR, 'isolateserver')
isolateserver.main(cmd_line_prefix + [p])
# TODO(maruel): The problem here is that the test depends on the file mode
# of the files in this directory.
# Fix is to copy the files in a temporary directory with known file modes.
#
- # If you modify isolateserver.ISOLATED_FILE_VERSION, you'll have to update
- # the hash below. Sorry about that.
+ # If you modify isolated_format.ISOLATED_FILE_VERSION, you'll have to
+ # update the hash below. Sorry about that.
self.checkOutput(
'1501166255279df1509408567340798d1cf089e7 %s\n' % p,
'')
def test_indir(self):
data = [
(['-I', 'http://foo.com/'], ('http://foo.com', None)),
- (['--indir', ROOT_DIR], ('', ROOT_DIR)),
+ (['--indir', net_utils.ROOT_DIR], ('', net_utils.ROOT_DIR)),
]
for value, (expected_isolate_server, expected_indir) in data:
parser = isolateserver.OptionParserIsolateServer()