2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
13 from telemetry import test
14 from telemetry.core import browser_options
15 from telemetry.core import discover
16 from telemetry.core import util
17 from telemetry.core import wpr_modes
18 from telemetry.page import page_measurement
19 from telemetry.page import page_measurement_results
20 from telemetry.page import page_runner
21 from telemetry.page import page_set
22 from telemetry.page import page_test
23 from telemetry.page import profile_creator
24 from telemetry.page import test_expectations
27 class RecordPage(page_test.PageTest):
28 def __init__(self, measurements):
29 # This class overwrites PageTest.Run, so that the test method name is not
30 # really used (except for throwing an exception if it doesn't exist).
31 super(RecordPage, self).__init__('Run')
32 self._action_names = set(
33 [measurement().action_name_to_run
34 for measurement in measurements.values()
35 if measurement().action_name_to_run])
38 def CanRunForPage(self, page):
39 return page.url.startswith('http')
41 def CustomizeBrowserOptionsForPageSet(self, pset, options):
43 for compound_action in self._CompoundActionsForPage(page, options):
44 for action in compound_action:
45 action.CustomizeBrowserOptionsForPageSet(options)
47 def WillNavigateToPage(self, page, tab):
48 """Override to ensure all resources are fetched from network."""
49 tab.ClearCache(force=False)
51 self.test.WillNavigateToPage(page, tab)
53 def DidNavigateToPage(self, page, tab):
54 """Forward the call to the test."""
56 self.test.DidNavigateToPage(page, tab)
58 def Run(self, options, page, tab, results):
59 # When recording, sleep to catch any resources that load post-onload.
60 tab.WaitForDocumentReadyStateToBeComplete()
63 dummy_results = page_measurement_results.PageMeasurementResults()
64 dummy_results.WillMeasurePage(page)
65 self.test.MeasurePage(page, tab, dummy_results)
66 dummy_results.DidMeasurePage()
68 # TODO(tonyg): This should probably monitor resource timing for activity
69 # and sleep until 2s since the last network event with some timeout like
70 # 20s. We could wrap this up as WaitForNetworkIdle() and share with the
74 # Run the actions for all measurements. Reload the page between
77 for compound_action in self._CompoundActionsForPage(page, options):
79 self.RunNavigateSteps(page, tab)
80 self._RunCompoundAction(page, tab, compound_action)
83 def _CompoundActionsForPage(self, page, options):
85 for action_name in self._action_names:
86 if not hasattr(page, action_name):
88 interactive = options and options.interactive
89 actions.append(page_test.GetCompoundActionFromPage(
90 page, action_name, interactive))
94 def _CreatePageSetForUrl(url):
95 ps_name = urlparse.urlparse(url).hostname + '.json'
96 ps_path = os.path.join(util.GetBaseDir(), 'page_sets', ps_name)
97 ps = {'archive_data_file': '../data/%s' % ps_name,
102 with open(ps_path, 'w') as f:
103 f.write(json.dumps(ps))
104 print 'Created new page set %s' % ps_path
105 return page_set.PageSet.FromFile(ps_path)
110 n: cls for n, cls in discover.DiscoverClasses(
111 base_dir, base_dir, page_measurement.PageMeasurement).items()
112 # Filter out unneeded ProfileCreators (crbug.com/319573).
113 if not issubclass(cls, profile_creator.ProfileCreator)
115 tests = discover.DiscoverClasses(base_dir, base_dir, test.Test,
116 index_by_class_name=True)
118 options = browser_options.BrowserFinderOptions()
119 parser = options.CreateParser('%prog <PageSet|Measurement|Test|URL>')
120 page_runner.AddCommandLineOptions(parser)
122 recorder = RecordPage(measurements)
123 recorder.AddCommandLineOptions(parser)
125 quick_args = [a for a in sys.argv[1:] if not a.startswith('-')]
126 if len(quick_args) != 1:
129 target = quick_args[0]
131 recorder.test = tests[target]().test()
132 recorder.test.AddCommandLineOptions(parser)
134 ps = tests[target]().CreatePageSet(options)
135 elif target in measurements:
136 recorder.test = measurements[target]()
137 recorder.test.AddCommandLineOptions(parser)
138 _, args = parser.parse_args()
139 ps = recorder.test.CreatePageSet(args, options)
140 elif target.endswith('.json'):
142 ps = page_set.PageSet.FromFile(target)
143 elif target.startswith('http'):
145 ps = _CreatePageSetForUrl(target)
150 expectations = test_expectations.TestExpectations()
152 # Set the archive path to something temporary.
153 temp_target_wpr_file_path = tempfile.mkstemp()[1]
154 ps.wpr_archive_info.AddNewTemporaryRecording(temp_target_wpr_file_path)
156 # Do the actual recording.
157 options.browser_options.wpr_mode = wpr_modes.WPR_RECORD
158 options.browser_options.no_proxy_server = True
159 recorder.CustomizeBrowserOptions(options)
160 results = page_runner.Run(recorder, ps, expectations, options)
162 if results.errors or results.failures:
163 logging.warning('Some pages failed. The recording has not been updated for '
165 logging.warning('Failed pages:\n%s',
166 '\n'.join(zip(*results.errors + results.failures)[0]))
169 logging.warning('Some pages were skipped. The recording has not been '
170 'updated for these pages.')
171 logging.warning('Skipped pages:\n%s', '\n'.join(zip(*results.skipped)[0]))
173 if results.successes:
174 # Update the metadata for the pages which were recorded.
175 ps.wpr_archive_info.AddRecordedPages(results.successes)
177 os.remove(temp_target_wpr_file_path)
179 return min(255, len(results.failures))