1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Background tasks for the www_server module.
7 This module has the logic for handling background tasks for the www frontend.
8 Long term actions (like periodic tracing), cannot be served synchronously in the
9 context of a /ajax/endpoint request (would timeout HTTP). Instead, for such long
10 operations, an instance of |BackgroundTask| is created here and the server
11 returns just its id. The client can later poll the status of the asynchronous
12 task to check for its progress.
14 From a technical viewpoint, each background task is just a python subprocess
15 which communicates its progress updates through a Queue. The messages enqueued
16 are tuples with the following format: (completion_ratio%, 'message string').
20 import multiprocessing
24 from memory_inspector.core import backends
25 from memory_inspector.data import file_storage
28 _tasks = {} #id (int) -> |BackgroundTask| instance.
31 def StartTracer(process, storage_path, interval, count, trace_native_heap):
32 assert(isinstance(process, backends.Process))
33 task = BackgroundTask(
35 storage_path=storage_path,
36 backend_name=process.device.backend.name,
37 device_id=process.device.id,
41 trace_native_heap=trace_native_heap)
43 _tasks[task.pid] = task
48 return _tasks.get(task_id)
52 for proc in _tasks.itervalues():
58 def TracerMain_(log, storage_path, backend_name, device_id, pid, interval,
59 count, trace_native_heap):
60 """Entry point for the background periodic tracer task."""
62 storage = file_storage.Storage(storage_path)
64 # Initialize the backend.
65 backend = backends.GetBackend(backend_name)
66 for k, v in storage.LoadSettings(backend_name).iteritems():
67 backend.settings[k] = v
69 # Initialize the device.
70 device = backends.GetDevice(backend_name, device_id)
71 for k, v in storage.LoadSettings(device_id).iteritems():
72 device.settings[k] = v
74 # Start periodic tracing.
75 process = device.GetProcess(pid)
76 log.put((1, 'Starting trace (%d dumps x %s s.). Device: %s, process: %s' % (
77 count, interval, device.name, process.name)))
78 datetime_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M')
79 archive_name = '%s - %s - %s' % (datetime_str, device.name, process.name)
80 archive = storage.OpenArchive(archive_name, create=True)
81 heaps_to_symbolize = []
83 for i in xrange(1, count + 1): # [1, count] range is easier to handle.
84 process = device.GetProcess(pid)
86 log.put((100, 'Process %d died.' % pid))
88 # Calculate the completion rate proportionally to 80%. We keep the remaining
89 # 20% for the final symbolization step (just an approximate estimation).
90 completion = 80 * i / count
91 log.put((completion, 'Dumping trace %d of %d' % (i, count)))
92 archive.StartNewSnapshot()
93 # Freeze the process, so that the mmaps and the heap dump are consistent.
97 nheap = process.DumpNativeHeap()
99 'Dumped %d native allocations' % len(nheap.allocations)))
101 # TODO(primiano): memdump has the bad habit of sending SIGCONT to the
102 # process. Fix that, so we are the only one in charge of controlling it.
103 mmaps = process.DumpMemoryMaps()
104 log.put((completion, 'Dumped %d memory maps' % len(mmaps)))
105 archive.StoreMemMaps(mmaps)
107 if trace_native_heap:
108 nheap.RelativizeStackFrames(mmaps)
109 nheap.CalculateResidentSize(mmaps)
110 archive.StoreNativeHeap(nheap)
111 heaps_to_symbolize += [nheap]
118 if heaps_to_symbolize:
119 log.put((90, 'Symbolizing'))
120 symbols = backend.ExtractSymbols(
121 heaps_to_symbolize, device.settings['native_symbol_paths'] or '')
122 expected_symbols_count = len(set.union(
123 *[set(x.stack_frames.iterkeys()) for x in heaps_to_symbolize]))
124 log.put((99, 'Symbolization complete. Got %d symbols (%.1f%%).' % (
125 len(symbols), 100.0 * len(symbols) / expected_symbols_count)))
126 archive.StoreSymbols(symbols)
128 log.put((100, 'Trace complete.'))
132 class BackgroundTask(multiprocessing.Process):
133 def __init__(self, entry_point, *args, **kwargs):
134 self._log_queue = multiprocessing.Queue()
135 self._progress_log = [] # A list of tuples [(50%, 'msg1'), (100%, 'msg2')].
136 super(BackgroundTask, self).__init__(
138 args=((self._log_queue,) + args), # Just propagate all args.
141 def GetProgress(self):
142 """ Returns a tuple (completion_rate, message). """
145 self._progress_log += [self._log_queue.get(block=False)]
148 if not self.is_alive() and self.exitcode != 0:
149 return self._progress_log + [(100, 'Failed with code %d' % self.exitcode)]
150 return self._progress_log