- add sources.
[platform/framework/web/crosswalk.git] / src / tools / telemetry / telemetry / page / page_set_archive_info_unittest.py
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4 import os
5 import shutil
6 import tempfile
7 import unittest
8
9 from telemetry.page import cloud_storage
10 from telemetry.page import page_set_archive_info
11
12
13 class MockPage(object):
14   def __init__(self, url):
15     self.url = url
16
17
18 url1 = 'http://www.foo.com/'
19 url2 = 'http://www.bar.com/'
20 url3 = 'http://www.baz.com/'
21 recording1 = 'data_001.wpr'
22 recording2 = 'data_002.wpr'
23 archive_info_contents = ("""
24 {
25 "archives": {
26   "%s": ["%s", "%s"],
27   "%s": ["%s"]
28 }
29 }
30 """ % (recording1, url1, url2, recording2, url3))
31 page1 = MockPage(url1)
32 page2 = MockPage(url2)
33 page3 = MockPage(url3)
34
35
36 class TestPageSetArchiveInfo(unittest.TestCase):
37   def setUp(self):
38     self.tmp_dir = tempfile.mkdtemp()
39     # Write the metadata.
40     self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
41     with open(self.page_set_archive_info_file, 'w') as f:
42       f.write(archive_info_contents)
43
44     # Write the existing .wpr files.
45     for i in [1, 2]:
46       with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
47         f.write(archive_info_contents)
48
49     # Create the PageSetArchiveInfo object to be tested.
50     self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
51         self.page_set_archive_info_file,
52         os.path.join(tempfile.gettempdir(), 'pageset.json'))
53
54   def tearDown(self):
55     shutil.rmtree(self.tmp_dir)
56
57   def assertCorrectHashFile(self, file_path):
58     self.assertTrue(os.path.exists(file_path + '.sha1'))
59     with open(file_path + '.sha1', 'rb') as f:
60       self.assertEquals(cloud_storage.GetHash(file_path), f.read())
61
62   def testReadingArchiveInfo(self):
63     self.assertEquals(recording1, os.path.basename(
64         self.archive_info.WprFilePathForPage(page1)))
65     self.assertEquals(recording1, os.path.basename(
66         self.archive_info.WprFilePathForPage(page2)))
67     self.assertEquals(recording2, os.path.basename(
68         self.archive_info.WprFilePathForPage(page3)))
69
70   def testModifications(self):
71     recording1_path = os.path.join(self.tmp_dir, recording1)
72     recording2_path = os.path.join(self.tmp_dir, recording2)
73
74     new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
75     new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
76     with open(new_temp_recording, 'w') as f:
77       f.write('wpr data')
78
79     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
80
81     self.assertEquals(new_temp_recording,
82                       self.archive_info.WprFilePathForPage(page1))
83     self.assertEquals(new_temp_recording,
84                       self.archive_info.WprFilePathForPage(page2))
85     self.assertEquals(new_temp_recording,
86                       self.archive_info.WprFilePathForPage(page3))
87
88     self.archive_info.AddRecordedPages([page2.url])
89
90     self.assertTrue(os.path.exists(new_recording1))
91     self.assertFalse(os.path.exists(new_temp_recording))
92
93     self.assertTrue(os.path.exists(recording1_path))
94     self.assertTrue(os.path.exists(recording2_path))
95     self.assertCorrectHashFile(new_recording1)
96
97     new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
98     with open(new_temp_recording, 'w') as f:
99       f.write('wpr data')
100
101     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
102     self.archive_info.AddRecordedPages([page3.url])
103
104     self.assertTrue(os.path.exists(new_recording2))
105     self.assertCorrectHashFile(new_recording2)
106     self.assertFalse(os.path.exists(new_temp_recording))
107
108     self.assertTrue(os.path.exists(recording1_path))
109     # recording2 is no longer needed, so it was deleted.
110     self.assertFalse(os.path.exists(recording2_path))
111
112   def testCreatingNewArchiveInfo(self):
113     # Write only the page set without the corresponding metadata file.
114     page_set_contents = ("""
115     {
116         archive_data_file": "new-metadata.json",
117         "pages": [
118             {
119                 "url": "%s",
120             }
121         ]
122     }""" % url1)
123
124     page_set_file = os.path.join(self.tmp_dir, 'new.json')
125     with open(page_set_file, 'w') as f:
126       f.write(page_set_contents)
127
128     self.page_set_archive_info_file = os.path.join(self.tmp_dir,
129                                                    'new-metadata.json')
130
131     # Create the PageSetArchiveInfo object to be tested.
132     self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
133         self.page_set_archive_info_file, page_set_file)
134
135     # Add a recording for all the pages.
136     new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
137     with open(new_temp_recording, 'w') as f:
138       f.write('wpr data')
139
140     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
141
142     self.assertEquals(new_temp_recording,
143                       self.archive_info.WprFilePathForPage(page1))
144
145     self.archive_info.AddRecordedPages([page1.url])
146
147     # Expected name for the recording (decided by PageSetArchiveInfo).
148     new_recording = os.path.join(self.tmp_dir, 'new_000.wpr')
149
150     self.assertTrue(os.path.exists(new_recording))
151     self.assertFalse(os.path.exists(new_temp_recording))
152     self.assertCorrectHashFile(new_recording)
153
154     # Check that the archive info was written correctly.
155     self.assertTrue(os.path.exists(self.page_set_archive_info_file))
156     read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
157         self.page_set_archive_info_file,
158         os.path.join(tempfile.gettempdir(), 'pageset.json'))
159     self.assertEquals(new_recording,
160                       read_archive_info.WprFilePathForPage(page1))