1 # Copyright 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
9 from telemetry.page import cloud_storage
10 from telemetry.page import page
11 from telemetry.page import page_set_archive_info
14 class MockPage(page.Page):
15 def __init__(self, url, name=None):
16 super(MockPage, self).__init__(url, None, name=name)
19 page1 = MockPage('http://www.foo.com/', 'Foo')
20 page2 = MockPage('http://www.bar.com/', 'Bar')
21 page3 = MockPage('http://www.baz.com/')
22 recording1 = 'data_001.wpr'
23 recording2 = 'data_002.wpr'
24 archive_info_contents = ("""
31 """ % (recording1, page1.display_name, page2.display_name, recording2,
35 class TestPageSetArchiveInfo(unittest.TestCase):
37 self.tmp_dir = tempfile.mkdtemp()
39 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
40 with open(self.page_set_archive_info_file, 'w') as f:
41 f.write(archive_info_contents)
43 # Write the existing .wpr files.
45 with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
46 f.write(archive_info_contents)
48 # Create the PageSetArchiveInfo object to be tested.
49 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
50 self.page_set_archive_info_file)
53 shutil.rmtree(self.tmp_dir)
55 def assertCorrectHashFile(self, file_path):
56 self.assertTrue(os.path.exists(file_path + '.sha1'))
57 with open(file_path + '.sha1', 'rb') as f:
58 self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
60 def testReadingArchiveInfo(self):
61 self.assertIsNotNone(self.archive_info.WprFilePathForPage(page1))
62 self.assertEquals(recording1, os.path.basename(
63 self.archive_info.WprFilePathForPage(page1)))
65 self.assertIsNotNone(self.archive_info.WprFilePathForPage(page2))
66 self.assertEquals(recording1, os.path.basename(
67 self.archive_info.WprFilePathForPage(page2)))
69 self.assertIsNotNone(self.archive_info.WprFilePathForPage(page3))
70 self.assertEquals(recording2, os.path.basename(
71 self.archive_info.WprFilePathForPage(page3)))
73 def testModifications(self):
74 recording1_path = os.path.join(self.tmp_dir, recording1)
75 recording2_path = os.path.join(self.tmp_dir, recording2)
77 new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
78 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
79 with open(new_temp_recording, 'w') as f:
82 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
84 self.assertEquals(new_temp_recording,
85 self.archive_info.WprFilePathForPage(page1))
86 self.assertEquals(new_temp_recording,
87 self.archive_info.WprFilePathForPage(page2))
88 self.assertEquals(new_temp_recording,
89 self.archive_info.WprFilePathForPage(page3))
91 self.archive_info.AddRecordedPages([page2.display_name])
93 self.assertTrue(os.path.exists(new_recording1))
94 self.assertFalse(os.path.exists(new_temp_recording))
96 self.assertTrue(os.path.exists(recording1_path))
97 self.assertTrue(os.path.exists(recording2_path))
98 self.assertCorrectHashFile(new_recording1)
100 new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
101 with open(new_temp_recording, 'w') as f:
104 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
105 self.archive_info.AddRecordedPages([page3.display_name])
107 self.assertTrue(os.path.exists(new_recording2))
108 self.assertCorrectHashFile(new_recording2)
109 self.assertFalse(os.path.exists(new_temp_recording))
111 self.assertTrue(os.path.exists(recording1_path))
112 # recording2 is no longer needed, so it was deleted.
113 self.assertFalse(os.path.exists(recording2_path))
115 def testCreatingNewArchiveInfo(self):
116 # Write only the page set without the corresponding metadata file.
117 page_set_contents = ("""
119 archive_data_file": "new_archive_info.json",
127 page_set_file = os.path.join(self.tmp_dir, 'new_page_set.json')
128 with open(page_set_file, 'w') as f:
129 f.write(page_set_contents)
131 self.page_set_archive_info_file = os.path.join(self.tmp_dir,
132 'new_archive_info.json')
134 # Create the PageSetArchiveInfo object to be tested.
135 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
136 self.page_set_archive_info_file)
138 # Add a recording for all the pages.
139 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
140 with open(new_temp_recording, 'w') as f:
143 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
145 self.assertEquals(new_temp_recording,
146 self.archive_info.WprFilePathForPage(page1))
148 self.archive_info.AddRecordedPages([page1.display_name])
150 # Expected name for the recording (decided by PageSetArchiveInfo).
151 new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
153 self.assertTrue(os.path.exists(new_recording))
154 self.assertFalse(os.path.exists(new_temp_recording))
155 self.assertCorrectHashFile(new_recording)
157 # Check that the archive info was written correctly.
158 self.assertTrue(os.path.exists(self.page_set_archive_info_file))
159 read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
160 self.page_set_archive_info_file)
161 self.assertEquals(new_recording,
162 read_archive_info.WprFilePathForPage(page1))