f060c74a857893b93911411643383462b600a9ac
[platform/framework/web/crosswalk.git] / src / tools / telemetry / telemetry / page / page_set_archive_info_unittest.py
1 # Copyright 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4 import os
5 import shutil
6 import tempfile
7 import unittest
8
9 from telemetry.page import cloud_storage
10 from telemetry.page import page
11 from telemetry.page import page_set_archive_info
12
13
14 class MockPage(page.Page):
15   def __init__(self, url, name=None):
16     super(MockPage, self).__init__(url, None, name=name)
17
18
19 page1 = MockPage('http://www.foo.com/', 'Foo')
20 page2 = MockPage('http://www.bar.com/', 'Bar')
21 page3 = MockPage('http://www.baz.com/')
22 recording1 = 'data_001.wpr'
23 recording2 = 'data_002.wpr'
24 archive_info_contents = ("""
25 {
26 "archives": {
27   "%s": ["%s", "%s"],
28   "%s": ["%s"]
29 }
30 }
31 """ % (recording1, page1.display_name, page2.display_name, recording2,
32        page3.display_name))
33
34
35 class TestPageSetArchiveInfo(unittest.TestCase):
36   def setUp(self):
37     self.tmp_dir = tempfile.mkdtemp()
38     # Write the metadata.
39     self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
40     with open(self.page_set_archive_info_file, 'w') as f:
41       f.write(archive_info_contents)
42
43     # Write the existing .wpr files.
44     for i in [1, 2]:
45       with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
46         f.write(archive_info_contents)
47
48     # Create the PageSetArchiveInfo object to be tested.
49     self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
50         self.page_set_archive_info_file)
51
52   def tearDown(self):
53     shutil.rmtree(self.tmp_dir)
54
55   def assertCorrectHashFile(self, file_path):
56     self.assertTrue(os.path.exists(file_path + '.sha1'))
57     with open(file_path + '.sha1', 'rb') as f:
58       self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
59
60   def testReadingArchiveInfo(self):
61     self.assertIsNotNone(self.archive_info.WprFilePathForPage(page1))
62     self.assertEquals(recording1, os.path.basename(
63         self.archive_info.WprFilePathForPage(page1)))
64
65     self.assertIsNotNone(self.archive_info.WprFilePathForPage(page2))
66     self.assertEquals(recording1, os.path.basename(
67         self.archive_info.WprFilePathForPage(page2)))
68
69     self.assertIsNotNone(self.archive_info.WprFilePathForPage(page3))
70     self.assertEquals(recording2, os.path.basename(
71         self.archive_info.WprFilePathForPage(page3)))
72
73   def testModifications(self):
74     recording1_path = os.path.join(self.tmp_dir, recording1)
75     recording2_path = os.path.join(self.tmp_dir, recording2)
76
77     new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
78     new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
79     with open(new_temp_recording, 'w') as f:
80       f.write('wpr data')
81
82     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
83
84     self.assertEquals(new_temp_recording,
85                       self.archive_info.WprFilePathForPage(page1))
86     self.assertEquals(new_temp_recording,
87                       self.archive_info.WprFilePathForPage(page2))
88     self.assertEquals(new_temp_recording,
89                       self.archive_info.WprFilePathForPage(page3))
90
91     self.archive_info.AddRecordedPages([page2.display_name])
92
93     self.assertTrue(os.path.exists(new_recording1))
94     self.assertFalse(os.path.exists(new_temp_recording))
95
96     self.assertTrue(os.path.exists(recording1_path))
97     self.assertTrue(os.path.exists(recording2_path))
98     self.assertCorrectHashFile(new_recording1)
99
100     new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
101     with open(new_temp_recording, 'w') as f:
102       f.write('wpr data')
103
104     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
105     self.archive_info.AddRecordedPages([page3.display_name])
106
107     self.assertTrue(os.path.exists(new_recording2))
108     self.assertCorrectHashFile(new_recording2)
109     self.assertFalse(os.path.exists(new_temp_recording))
110
111     self.assertTrue(os.path.exists(recording1_path))
112     # recording2 is no longer needed, so it was deleted.
113     self.assertFalse(os.path.exists(recording2_path))
114
115   def testCreatingNewArchiveInfo(self):
116     # Write only the page set without the corresponding metadata file.
117     page_set_contents = ("""
118     {
119         archive_data_file": "new_archive_info.json",
120         "pages": [
121             {
122                 "url": "%s",
123             }
124         ]
125     }""" % page1.url)
126
127     page_set_file = os.path.join(self.tmp_dir, 'new_page_set.json')
128     with open(page_set_file, 'w') as f:
129       f.write(page_set_contents)
130
131     self.page_set_archive_info_file = os.path.join(self.tmp_dir,
132                                                    'new_archive_info.json')
133
134     # Create the PageSetArchiveInfo object to be tested.
135     self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
136         self.page_set_archive_info_file)
137
138     # Add a recording for all the pages.
139     new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
140     with open(new_temp_recording, 'w') as f:
141       f.write('wpr data')
142
143     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
144
145     self.assertEquals(new_temp_recording,
146                       self.archive_info.WprFilePathForPage(page1))
147
148     self.archive_info.AddRecordedPages([page1.display_name])
149
150     # Expected name for the recording (decided by PageSetArchiveInfo).
151     new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
152
153     self.assertTrue(os.path.exists(new_recording))
154     self.assertFalse(os.path.exists(new_temp_recording))
155     self.assertCorrectHashFile(new_recording)
156
157     # Check that the archive info was written correctly.
158     self.assertTrue(os.path.exists(self.page_set_archive_info_file))
159     read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
160         self.page_set_archive_info_file)
161     self.assertEquals(new_recording,
162                       read_archive_info.WprFilePathForPage(page1))