1 # Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
12 from telemetry.page import cloud_storage
15 class PageSetArchiveInfo(object):
16 def __init__(self, archive_data_file_path, page_set_file_path, data):
17 self._archive_data_file_path = archive_data_file_path
18 self._archive_data_file_dir = os.path.dirname(archive_data_file_path)
20 # Ensure directory exists.
21 if not os.path.exists(self._archive_data_file_dir):
22 os.makedirs(self._archive_data_file_dir)
24 # Back pointer to the page set file.
25 self._page_set_file_path = page_set_file_path
27 # Download all .wpr files.
28 for archive_path in data['archives']:
29 archive_path = self._WprFileNameToPath(archive_path)
31 cloud_storage.GetIfChanged(cloud_storage.INTERNAL_BUCKET, archive_path)
32 except (cloud_storage.CredentialsError,
33 cloud_storage.PermissionError) as e:
34 if os.path.exists(archive_path):
35 # If the archive exists, assume the user recorded their own and
37 logging.warning('Could not download WPR archive: %s', archive_path)
39 # If the archive doesn't exist, this is fatal.
40 logging.error('Can not run without required WPR archive: %s. '
41 'If you believe you have credentials, follow the '
42 'instructions below. If you do not have credentials, '
43 'you may use record_wpr to make your own recording or '
44 'run against live sites with --allow-live-sites.',
49 # Map from the relative path (as it appears in the metadata file) of the
50 # .wpr file to a list of urls it supports.
51 self._wpr_file_to_urls = data['archives']
53 # Map from the page url to a relative path (as it appears in the metadata
54 # file) of the .wpr file.
55 self._url_to_wpr_file = dict()
56 # Find out the wpr file names for each page.
57 for wpr_file in data['archives']:
58 page_urls = data['archives'][wpr_file]
60 self._url_to_wpr_file[url] = wpr_file
61 self.temp_target_wpr_file_path = None
64 def FromFile(cls, file_path, page_set_file_path):
65 if os.path.exists(file_path):
66 with open(file_path, 'r') as f:
68 return cls(file_path, page_set_file_path, data)
69 return cls(file_path, page_set_file_path, {'archives': {}})
71 def WprFilePathForPage(self, page):
72 if self.temp_target_wpr_file_path:
73 return self.temp_target_wpr_file_path
74 wpr_file = self._url_to_wpr_file.get(page.url, None)
76 return self._WprFileNameToPath(wpr_file)
79 def AddNewTemporaryRecording(self, temp_target_wpr_file_path):
80 self.temp_target_wpr_file_path = temp_target_wpr_file_path
82 def AddRecordedPages(self, urls):
83 (target_wpr_file, target_wpr_file_path) = self._NextWprFileName()
85 self._SetWprFileForPage(url, target_wpr_file)
86 shutil.move(self.temp_target_wpr_file_path, target_wpr_file_path)
88 # Update the hash file.
89 with open(target_wpr_file_path + '.sha1', 'wb') as f:
90 f.write(cloud_storage.GetHash(target_wpr_file_path))
94 self._DeleteAbandonedWprFiles()
96 def _DeleteAbandonedWprFiles(self):
97 # Update the metadata so that the abandoned wpr files don't have empty url
99 abandoned_wpr_files = self._AbandonedWprFiles()
100 for wpr_file in abandoned_wpr_files:
101 del self._wpr_file_to_urls[wpr_file]
102 # Don't fail if we're unable to delete some of the files.
103 wpr_file_path = self._WprFileNameToPath(wpr_file)
105 os.remove(wpr_file_path)
107 logging.warning('Failed to delete file: %s' % wpr_file_path)
109 def _AbandonedWprFiles(self):
110 abandoned_wpr_files = []
111 for wpr_file, urls in self._wpr_file_to_urls.iteritems():
113 abandoned_wpr_files.append(wpr_file)
114 return abandoned_wpr_files
116 def _WriteToFile(self):
117 """Writes the metadata into the file passed as constructor parameter."""
119 metadata['description'] = (
120 'Describes the Web Page Replay archives for a page set. Don\'t edit by '
121 'hand! Use record_wpr for updating.')
122 # Pointer from the metadata to the page set .json file.
123 metadata['page_set'] = os.path.relpath(self._page_set_file_path,
124 self._archive_data_file_dir)
125 metadata['archives'] = self._wpr_file_to_urls.copy()
126 # Don't write data for abandoned archives.
127 abandoned_wpr_files = self._AbandonedWprFiles()
128 for wpr_file in abandoned_wpr_files:
129 del metadata['archives'][wpr_file]
131 with open(self._archive_data_file_path, 'w') as f:
132 json.dump(metadata, f, indent=4)
135 def _WprFileNameToPath(self, wpr_file):
136 return os.path.abspath(os.path.join(self._archive_data_file_dir, wpr_file))
138 def _NextWprFileName(self):
139 """Creates a new file name for a wpr archive file."""
140 # The names are of the format "some_thing_number.wpr". Read the numbers.
143 for wpr_file in self._wpr_file_to_urls:
144 match = re.match(r'(?P<BASE>.*)_(?P<NUMBER>[0-9]+)\.wpr', wpr_file)
146 raise Exception('Illegal wpr file name ' + wpr_file)
147 highest_number = max(int(match.groupdict()['NUMBER']), highest_number)
148 if base and match.groupdict()['BASE'] != base:
149 raise Exception('Illegal wpr file name ' + wpr_file +
150 ', doesn\'t begin with ' + base)
151 base = match.groupdict()['BASE']
153 # If we're creating a completely new info file, use the base name of the
155 base = os.path.splitext(os.path.basename(self._page_set_file_path))[0]
156 new_filename = '%s_%03d.wpr' % (base, highest_number + 1)
157 return new_filename, self._WprFileNameToPath(new_filename)
159 def _SetWprFileForPage(self, url, wpr_file):
160 """For modifying the metadata when we're going to record a new archive."""
161 old_wpr_file = self._url_to_wpr_file.get(url, None)
163 self._wpr_file_to_urls[old_wpr_file].remove(url)
164 self._url_to_wpr_file[url] = wpr_file
165 if wpr_file not in self._wpr_file_to_urls:
166 self._wpr_file_to_urls[wpr_file] = []
167 self._wpr_file_to_urls[wpr_file].append(url)