1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
9 from telemetry.page import cloud_storage
10 from telemetry.page import page_set_archive_info
13 class MockPage(object):
14 def __init__(self, url):
18 url1 = 'http://www.foo.com/'
19 url2 = 'http://www.bar.com/'
20 url3 = 'http://www.baz.com/'
21 recording1 = 'data_001.wpr'
22 recording2 = 'data_002.wpr'
23 archive_info_contents = ("""
30 """ % (recording1, url1, url2, recording2, url3))
31 page1 = MockPage(url1)
32 page2 = MockPage(url2)
33 page3 = MockPage(url3)
36 class TestPageSetArchiveInfo(unittest.TestCase):
38 self.tmp_dir = tempfile.mkdtemp()
40 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
41 with open(self.page_set_archive_info_file, 'w') as f:
42 f.write(archive_info_contents)
44 # Write the existing .wpr files.
46 with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
47 f.write(archive_info_contents)
49 # Create the PageSetArchiveInfo object to be tested.
50 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
51 self.page_set_archive_info_file)
54 shutil.rmtree(self.tmp_dir)
56 def assertCorrectHashFile(self, file_path):
57 self.assertTrue(os.path.exists(file_path + '.sha1'))
58 with open(file_path + '.sha1', 'rb') as f:
59 self.assertEquals(cloud_storage.GetHash(file_path), f.read())
61 def testReadingArchiveInfo(self):
62 self.assertEquals(recording1, os.path.basename(
63 self.archive_info.WprFilePathForPage(page1)))
64 self.assertEquals(recording1, os.path.basename(
65 self.archive_info.WprFilePathForPage(page2)))
66 self.assertEquals(recording2, os.path.basename(
67 self.archive_info.WprFilePathForPage(page3)))
69 def testModifications(self):
70 recording1_path = os.path.join(self.tmp_dir, recording1)
71 recording2_path = os.path.join(self.tmp_dir, recording2)
73 new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
74 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
75 with open(new_temp_recording, 'w') as f:
78 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
80 self.assertEquals(new_temp_recording,
81 self.archive_info.WprFilePathForPage(page1))
82 self.assertEquals(new_temp_recording,
83 self.archive_info.WprFilePathForPage(page2))
84 self.assertEquals(new_temp_recording,
85 self.archive_info.WprFilePathForPage(page3))
87 self.archive_info.AddRecordedPages([page2.url])
89 self.assertTrue(os.path.exists(new_recording1))
90 self.assertFalse(os.path.exists(new_temp_recording))
92 self.assertTrue(os.path.exists(recording1_path))
93 self.assertTrue(os.path.exists(recording2_path))
94 self.assertCorrectHashFile(new_recording1)
96 new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
97 with open(new_temp_recording, 'w') as f:
100 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
101 self.archive_info.AddRecordedPages([page3.url])
103 self.assertTrue(os.path.exists(new_recording2))
104 self.assertCorrectHashFile(new_recording2)
105 self.assertFalse(os.path.exists(new_temp_recording))
107 self.assertTrue(os.path.exists(recording1_path))
108 # recording2 is no longer needed, so it was deleted.
109 self.assertFalse(os.path.exists(recording2_path))
111 def testCreatingNewArchiveInfo(self):
112 # Write only the page set without the corresponding metadata file.
113 page_set_contents = ("""
115 archive_data_file": "new_archive_info.json",
123 page_set_file = os.path.join(self.tmp_dir, 'new_page_set.json')
124 with open(page_set_file, 'w') as f:
125 f.write(page_set_contents)
127 self.page_set_archive_info_file = os.path.join(self.tmp_dir,
128 'new_archive_info.json')
130 # Create the PageSetArchiveInfo object to be tested.
131 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
132 self.page_set_archive_info_file)
134 # Add a recording for all the pages.
135 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
136 with open(new_temp_recording, 'w') as f:
139 self.archive_info.AddNewTemporaryRecording(new_temp_recording)
141 self.assertEquals(new_temp_recording,
142 self.archive_info.WprFilePathForPage(page1))
144 self.archive_info.AddRecordedPages([page1.url])
146 # Expected name for the recording (decided by PageSetArchiveInfo).
147 new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
149 self.assertTrue(os.path.exists(new_recording))
150 self.assertFalse(os.path.exists(new_temp_recording))
151 self.assertCorrectHashFile(new_recording)
153 # Check that the archive info was written correctly.
154 self.assertTrue(os.path.exists(self.page_set_archive_info_file))
155 read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
156 self.page_set_archive_info_file)
157 self.assertEquals(new_recording,
158 read_archive_info.WprFilePathForPage(page1))