Upstream version 5.34.92.0
[platform/framework/web/crosswalk.git] / src / tools / telemetry / telemetry / page / page_set_archive_info_unittest.py
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4 import os
5 import shutil
6 import tempfile
7 import unittest
8
9 from telemetry.page import cloud_storage
10 from telemetry.page import page_set_archive_info
11
12
13 class MockPage(object):
14   def __init__(self, url):
15     self.url = url
16
17
18 url1 = 'http://www.foo.com/'
19 url2 = 'http://www.bar.com/'
20 url3 = 'http://www.baz.com/'
21 recording1 = 'data_001.wpr'
22 recording2 = 'data_002.wpr'
23 archive_info_contents = ("""
24 {
25 "archives": {
26   "%s": ["%s", "%s"],
27   "%s": ["%s"]
28 }
29 }
30 """ % (recording1, url1, url2, recording2, url3))
31 page1 = MockPage(url1)
32 page2 = MockPage(url2)
33 page3 = MockPage(url3)
34
35
36 class TestPageSetArchiveInfo(unittest.TestCase):
37   def setUp(self):
38     self.tmp_dir = tempfile.mkdtemp()
39     # Write the metadata.
40     self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
41     with open(self.page_set_archive_info_file, 'w') as f:
42       f.write(archive_info_contents)
43
44     # Write the existing .wpr files.
45     for i in [1, 2]:
46       with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
47         f.write(archive_info_contents)
48
49     # Create the PageSetArchiveInfo object to be tested.
50     self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
51         self.page_set_archive_info_file)
52
53   def tearDown(self):
54     shutil.rmtree(self.tmp_dir)
55
56   def assertCorrectHashFile(self, file_path):
57     self.assertTrue(os.path.exists(file_path + '.sha1'))
58     with open(file_path + '.sha1', 'rb') as f:
59       self.assertEquals(cloud_storage.GetHash(file_path), f.read())
60
61   def testReadingArchiveInfo(self):
62     self.assertEquals(recording1, os.path.basename(
63         self.archive_info.WprFilePathForPage(page1)))
64     self.assertEquals(recording1, os.path.basename(
65         self.archive_info.WprFilePathForPage(page2)))
66     self.assertEquals(recording2, os.path.basename(
67         self.archive_info.WprFilePathForPage(page3)))
68
69   def testModifications(self):
70     recording1_path = os.path.join(self.tmp_dir, recording1)
71     recording2_path = os.path.join(self.tmp_dir, recording2)
72
73     new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
74     new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
75     with open(new_temp_recording, 'w') as f:
76       f.write('wpr data')
77
78     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
79
80     self.assertEquals(new_temp_recording,
81                       self.archive_info.WprFilePathForPage(page1))
82     self.assertEquals(new_temp_recording,
83                       self.archive_info.WprFilePathForPage(page2))
84     self.assertEquals(new_temp_recording,
85                       self.archive_info.WprFilePathForPage(page3))
86
87     self.archive_info.AddRecordedPages([page2.url])
88
89     self.assertTrue(os.path.exists(new_recording1))
90     self.assertFalse(os.path.exists(new_temp_recording))
91
92     self.assertTrue(os.path.exists(recording1_path))
93     self.assertTrue(os.path.exists(recording2_path))
94     self.assertCorrectHashFile(new_recording1)
95
96     new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
97     with open(new_temp_recording, 'w') as f:
98       f.write('wpr data')
99
100     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
101     self.archive_info.AddRecordedPages([page3.url])
102
103     self.assertTrue(os.path.exists(new_recording2))
104     self.assertCorrectHashFile(new_recording2)
105     self.assertFalse(os.path.exists(new_temp_recording))
106
107     self.assertTrue(os.path.exists(recording1_path))
108     # recording2 is no longer needed, so it was deleted.
109     self.assertFalse(os.path.exists(recording2_path))
110
111   def testCreatingNewArchiveInfo(self):
112     # Write only the page set without the corresponding metadata file.
113     page_set_contents = ("""
114     {
115         archive_data_file": "new_archive_info.json",
116         "pages": [
117             {
118                 "url": "%s",
119             }
120         ]
121     }""" % url1)
122
123     page_set_file = os.path.join(self.tmp_dir, 'new_page_set.json')
124     with open(page_set_file, 'w') as f:
125       f.write(page_set_contents)
126
127     self.page_set_archive_info_file = os.path.join(self.tmp_dir,
128                                                    'new_archive_info.json')
129
130     # Create the PageSetArchiveInfo object to be tested.
131     self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
132         self.page_set_archive_info_file)
133
134     # Add a recording for all the pages.
135     new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
136     with open(new_temp_recording, 'w') as f:
137       f.write('wpr data')
138
139     self.archive_info.AddNewTemporaryRecording(new_temp_recording)
140
141     self.assertEquals(new_temp_recording,
142                       self.archive_info.WprFilePathForPage(page1))
143
144     self.archive_info.AddRecordedPages([page1.url])
145
146     # Expected name for the recording (decided by PageSetArchiveInfo).
147     new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
148
149     self.assertTrue(os.path.exists(new_recording))
150     self.assertFalse(os.path.exists(new_temp_recording))
151     self.assertCorrectHashFile(new_recording)
152
153     # Check that the archive info was written correctly.
154     self.assertTrue(os.path.exists(self.page_set_archive_info_file))
155     read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
156         self.page_set_archive_info_file)
157     self.assertEquals(new_recording,
158                       read_archive_info.WprFilePathForPage(page1))