1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
9 from telemetry.page import cloud_storage
10 from telemetry.page import page_set_archive_info
13 class MockPage(object):
14 def __init__(self, url):
18 url1 = 'http://www.foo.com/'
19 url2 = 'http://www.bar.com/'
20 url3 = 'http://www.baz.com/'
21 recording1 = 'data_001.wpr'
22 recording2 = 'data_002.wpr'
23 archive_info_contents = ("""
30 """ % (recording1, url1, url2, recording2, url3))
31 page1 = MockPage(url1)
32 page2 = MockPage(url2)
33 page3 = MockPage(url3)
36 class TestPageSetArchiveInfo(unittest.TestCase):
38 self.tmp_dir = tempfile.mkdtemp()
40 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
41 with open(self.page_set_archive_info_file, 'w') as f:
42 f.write(archive_info_contents)
44 # Write the existing .wpr files.
46 with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
47 f.write(archive_info_contents)
49 # Create the PageSetArchiveInfo object to be tested.
50 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
51 self.page_set_archive_info_file,
52 os.path.join(tempfile.gettempdir(), 'pageset.json'))
55 shutil.rmtree(self.tmp_dir)
57 def assertCorrectHashFile(self, file_path):
58 self.assertTrue(os.path.exists(file_path + '.sha1'))
59 with open(file_path + '.sha1', 'rb') as f:
60 self.assertEquals(cloud_storage.GetHash(file_path), f.read())
62 def testReadingArchiveInfo(self):
63 self.assertEquals(recording1, os.path.basename(
64 self.archive_info.WprFilePathForPage(page1)))
65 self.assertEquals(recording1, os.path.basename(
66 self.archive_info.WprFilePathForPage(page2)))
67 self.assertEquals(recording2, os.path.basename(
68 self.archive_info.WprFilePathForPage(page3)))
70 def testModifications(self):
71 recording1_path = os.path.join(self.tmp_dir, recording1)
72 recording2_path = os.path.join(self.tmp_dir, recording2)
74 new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
75 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
76 with open(new_temp_recording, 'w') as f:
79 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
81 self.assertEquals(new_temp_recording,
82 self.archive_info.WprFilePathForPage(page1))
83 self.assertEquals(new_temp_recording,
84 self.archive_info.WprFilePathForPage(page2))
85 self.assertEquals(new_temp_recording,
86 self.archive_info.WprFilePathForPage(page3))
88 self.archive_info.AddRecordedPages([page2.url])
90 self.assertTrue(os.path.exists(new_recording1))
91 self.assertFalse(os.path.exists(new_temp_recording))
93 self.assertTrue(os.path.exists(recording1_path))
94 self.assertTrue(os.path.exists(recording2_path))
95 self.assertCorrectHashFile(new_recording1)
97 new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
98 with open(new_temp_recording, 'w') as f:
101 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
102 self.archive_info.AddRecordedPages([page3.url])
104 self.assertTrue(os.path.exists(new_recording2))
105 self.assertCorrectHashFile(new_recording2)
106 self.assertFalse(os.path.exists(new_temp_recording))
108 self.assertTrue(os.path.exists(recording1_path))
109 # recording2 is no longer needed, so it was deleted.
110 self.assertFalse(os.path.exists(recording2_path))
112 def testCreatingNewArchiveInfo(self):
113 # Write only the page set without the corresponding metadata file.
114 page_set_contents = ("""
116 archive_data_file": "new-metadata.json",
124 page_set_file = os.path.join(self.tmp_dir, 'new.json')
125 with open(page_set_file, 'w') as f:
126 f.write(page_set_contents)
128 self.page_set_archive_info_file = os.path.join(self.tmp_dir,
131 # Create the PageSetArchiveInfo object to be tested.
132 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
133 self.page_set_archive_info_file, page_set_file)
135 # Add a recording for all the pages.
136 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
137 with open(new_temp_recording, 'w') as f:
140 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
142 self.assertEquals(new_temp_recording,
143 self.archive_info.WprFilePathForPage(page1))
145 self.archive_info.AddRecordedPages([page1.url])
147 # Expected name for the recording (decided by PageSetArchiveInfo).
148 new_recording = os.path.join(self.tmp_dir, 'new_000.wpr')
150 self.assertTrue(os.path.exists(new_recording))
151 self.assertFalse(os.path.exists(new_temp_recording))
152 self.assertCorrectHashFile(new_recording)
154 # Check that the archive info was written correctly.
155 self.assertTrue(os.path.exists(self.page_set_archive_info_file))
156 read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
157 self.page_set_archive_info_file,
158 os.path.join(tempfile.gettempdir(), 'pageset.json'))
159 self.assertEquals(new_recording,
160 read_archive_info.WprFilePathForPage(page1))