Upstream version 8.36.161.0
[platform/framework/web/crosswalk.git] / src / third_party / chromite / scripts / parallel_emerge.py
1 #!/usr/bin/python
2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Program to run emerge in parallel, for significant speedup.
7
8 Usage:
9  ./parallel_emerge [--board=BOARD] [--workon=PKGS]
10                    [--force-remote-binary=PKGS] [emerge args] package
11
12 This script runs multiple emerge processes in parallel, using appropriate
13 Portage APIs. It is faster than standard emerge because it has a
14 multiprocess model instead of an asynchronous model.
15 """
16
17 import codecs
18 import copy
19 import errno
20 import gc
21 import heapq
22 import multiprocessing
23 import os
24 try:
25   import Queue
26 except ImportError:
27   # Python-3 renamed to "queue".  We still use Queue to avoid collisions
28   # with naming variables as "queue".  Maybe we'll transition at some point.
29   # pylint: disable=F0401
30   import queue as Queue
31 import signal
32 import sys
33 import tempfile
34 import threading
35 import time
36 import traceback
37
38 from chromite.lib import cros_build_lib
39
40 # If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On
41 # Chromium OS, the default "portage" user doesn't have the necessary
42 # permissions. It'd be easier if we could default to $USERNAME, but $USERNAME
43 # is "root" here because we get called through sudo.
44 #
45 # We need to set this before importing any portage modules, because portage
46 # looks up "PORTAGE_USERNAME" at import time.
47 #
48 # NOTE: .bashrc sets PORTAGE_USERNAME = $USERNAME, so most people won't
49 # encounter this case unless they have an old chroot or blow away the
50 # environment by running sudo without the -E specifier.
51 if "PORTAGE_USERNAME" not in os.environ:
52   homedir = os.environ.get("HOME")
53   if homedir:
54     os.environ["PORTAGE_USERNAME"] = os.path.basename(homedir)
55
56 # Portage doesn't expose dependency trees in its public API, so we have to
57 # make use of some private APIs here. These modules are found under
58 # /usr/lib/portage/pym/.
59 #
60 # TODO(davidjames): Update Portage to expose public APIs for these features.
61 # pylint: disable=F0401
62 from _emerge.actions import adjust_configs
63 from _emerge.actions import load_emerge_config
64 from _emerge.create_depgraph_params import create_depgraph_params
65 from _emerge.depgraph import backtrack_depgraph
66 try:
67   from _emerge.main import clean_logs
68 except ImportError:
69   # Older portage versions did not provide clean_logs, so stub it.
70   # We need this if running in an older chroot that hasn't yet upgraded
71   # the portage version.
72   clean_logs = lambda x: None
73 from _emerge.main import emerge_main
74 from _emerge.main import parse_opts
75 from _emerge.Package import Package
76 from _emerge.Scheduler import Scheduler
77 from _emerge.stdout_spinner import stdout_spinner
78 from portage._global_updates import _global_updates
79 import portage
80 import portage.debug
81 from portage.versions import vercmp
82 # pylint: enable=F0401
83
84
85 def Usage():
86   """Print usage."""
87   print "Usage:"
88   print " ./parallel_emerge [--board=BOARD] [--workon=PKGS]"
89   print "                   [--rebuild] [emerge args] package"
90   print
91   print "Packages specified as workon packages are always built from source."
92   print
93   print "The --workon argument is mainly useful when you want to build and"
94   print "install packages that you are working on unconditionally, but do not"
95   print "to have to rev the package to indicate you want to build it from"
96   print "source. The build_packages script will automatically supply the"
97   print "workon argument to emerge, ensuring that packages selected using"
98   print "cros-workon are rebuilt."
99   print
100   print "The --rebuild option rebuilds packages whenever their dependencies"
101   print "are changed. This ensures that your build is correct."
102
103
104 # Global start time
105 GLOBAL_START = time.time()
106
107 # Whether process has been killed by a signal.
108 KILLED = multiprocessing.Event()
109
110
111 class EmergeData(object):
112   """This simple struct holds various emerge variables.
113
114   This struct helps us easily pass emerge variables around as a unit.
115   These variables are used for calculating dependencies and installing
116   packages.
117   """
118
119   __slots__ = ["action", "cmdline_packages", "depgraph", "favorites",
120                "mtimedb", "opts", "root_config", "scheduler_graph",
121                "settings", "spinner", "trees"]
122
123   def __init__(self):
124     # The action the user requested. If the user is installing packages, this
125     # is None. If the user is doing anything other than installing packages,
126     # this will contain the action name, which will map exactly to the
127     # long-form name of the associated emerge option.
128     #
129     # Example: If you call parallel_emerge --unmerge package, the action name
130     #          will be "unmerge"
131     self.action = None
132
133     # The list of packages the user passed on the command-line.
134     self.cmdline_packages = None
135
136     # The emerge dependency graph. It'll contain all the packages involved in
137     # this merge, along with their versions.
138     self.depgraph = None
139
140     # The list of candidates to add to the world file.
141     self.favorites = None
142
143     # A dict of the options passed to emerge. This dict has been cleaned up
144     # a bit by parse_opts, so that it's a bit easier for the emerge code to
145     # look at the options.
146     #
147     # Emerge takes a few shortcuts in its cleanup process to make parsing of
148     # the options dict easier. For example, if you pass in "--usepkg=n", the
149     # "--usepkg" flag is just left out of the dictionary altogether. Because
150     # --usepkg=n is the default, this makes parsing easier, because emerge
151     # can just assume that if "--usepkg" is in the dictionary, it's enabled.
152     #
153     # These cleanup processes aren't applied to all options. For example, the
154     # --with-bdeps flag is passed in as-is.  For a full list of the cleanups
155     # applied by emerge, see the parse_opts function in the _emerge.main
156     # package.
157     self.opts = None
158
159     # A dictionary used by portage to maintain global state. This state is
160     # loaded from disk when portage starts up, and saved to disk whenever we
161     # call mtimedb.commit().
162     #
163     # This database contains information about global updates (i.e., what
164     # version of portage we have) and what we're currently doing. Portage
165     # saves what it is currently doing in this database so that it can be
166     # resumed when you call it with the --resume option.
167     #
168     # parallel_emerge does not save what it is currently doing in the mtimedb,
169     # so we do not support the --resume option.
170     self.mtimedb = None
171
172     # The portage configuration for our current root. This contains the portage
173     # settings (see below) and the three portage trees for our current root.
174     # (The three portage trees are explained below, in the documentation for
175     #  the "trees" member.)
176     self.root_config = None
177
178     # The scheduler graph is used by emerge to calculate what packages to
179     # install. We don't actually install any deps, so this isn't really used,
180     # but we pass it in to the Scheduler object anyway.
181     self.scheduler_graph = None
182
183     # Portage settings for our current session. Most of these settings are set
184     # in make.conf inside our current install root.
185     self.settings = None
186
187     # The spinner, which spews stuff to stdout to indicate that portage is
188     # doing something. We maintain our own spinner, so we set the portage
189     # spinner to "silent" mode.
190     self.spinner = None
191
192     # The portage trees. There are separate portage trees for each root. To get
193     # the portage tree for the current root, you can look in self.trees[root],
194     # where root = self.settings["ROOT"].
195     #
196     # In each root, there are three trees: vartree, porttree, and bintree.
197     #  - vartree: A database of the currently-installed packages.
198     #  - porttree: A database of ebuilds, that can be used to build packages.
199     #  - bintree: A database of binary packages.
200     self.trees = None
201
202
203 class DepGraphGenerator(object):
204   """Grab dependency information about packages from portage.
205
206   Typical usage:
207     deps = DepGraphGenerator()
208     deps.Initialize(sys.argv[1:])
209     deps_tree, deps_info = deps.GenDependencyTree()
210     deps_graph = deps.GenDependencyGraph(deps_tree, deps_info)
211     deps.PrintTree(deps_tree)
212     PrintDepsMap(deps_graph)
213   """
214
215   __slots__ = ["board", "emerge", "package_db", "show_output", "unpack_only"]
216
217   def __init__(self):
218     self.board = None
219     self.emerge = EmergeData()
220     self.package_db = {}
221     self.show_output = False
222     self.unpack_only = False
223
224   def ParseParallelEmergeArgs(self, argv):
225     """Read the parallel emerge arguments from the command-line.
226
227     We need to be compatible with emerge arg format.  We scrape arguments that
228     are specific to parallel_emerge, and pass through the rest directly to
229     emerge.
230
231     Args:
232       argv: arguments list
233
234     Returns:
235       Arguments that don't belong to parallel_emerge
236     """
237     emerge_args = []
238     for arg in argv:
239       # Specifically match arguments that are specific to parallel_emerge, and
240       # pass through the rest.
241       if arg.startswith("--board="):
242         self.board = arg.replace("--board=", "")
243       elif arg.startswith("--workon="):
244         workon_str = arg.replace("--workon=", "")
245         emerge_args.append("--reinstall-atoms=%s" % workon_str)
246         emerge_args.append("--usepkg-exclude=%s" % workon_str)
247       elif arg.startswith("--force-remote-binary="):
248         force_remote_binary = arg.replace("--force-remote-binary=", "")
249         emerge_args.append("--useoldpkg-atoms=%s" % force_remote_binary)
250       elif arg == "--show-output":
251         self.show_output = True
252       elif arg == "--rebuild":
253         emerge_args.append("--rebuild-if-unbuilt")
254       elif arg == "--unpackonly":
255         emerge_args.append("--fetchonly")
256         self.unpack_only = True
257       else:
258         # Not one of our options, so pass through to emerge.
259         emerge_args.append(arg)
260
261     # These packages take a really long time to build, so, for expediency, we
262     # are blacklisting them from automatic rebuilds because one of their
263     # dependencies needs to be recompiled.
264     for pkg in ("chromeos-base/chromeos-chrome",):
265       emerge_args.append("--rebuild-exclude=%s" % pkg)
266
267     return emerge_args
268
269   def Initialize(self, args):
270     """Initializer. Parses arguments and sets up portage state."""
271
272     # Parse and strip out args that are just intended for parallel_emerge.
273     emerge_args = self.ParseParallelEmergeArgs(args)
274
275     # Setup various environment variables based on our current board. These
276     # variables are normally setup inside emerge-${BOARD}, but since we don't
277     # call that script, we have to set it up here. These variables serve to
278     # point our tools at /build/BOARD and to setup cross compiles to the
279     # appropriate board as configured in toolchain.conf.
280     if self.board:
281       sysroot = cros_build_lib.GetSysroot(board=self.board)
282       os.environ["PORTAGE_CONFIGROOT"] = sysroot
283       os.environ["PORTAGE_SYSROOT"] = sysroot
284       os.environ["SYSROOT"] = sysroot
285
286       # Although CHROMEOS_ROOT isn't specific to boards, it's normally setup
287       # inside emerge-${BOARD}, so we set it up here for compatibility. It
288       # will be going away soon as we migrate to CROS_WORKON_SRCROOT.
289       os.environ.setdefault("CHROMEOS_ROOT", os.environ["HOME"] + "/trunk")
290
291     # Turn off interactive delays
292     os.environ["EBEEP_IGNORE"] = "1"
293     os.environ["EPAUSE_IGNORE"] = "1"
294     os.environ["CLEAN_DELAY"] = "0"
295
296     # Parse the emerge options.
297     action, opts, cmdline_packages = parse_opts(emerge_args, silent=True)
298
299     # Set environment variables based on options. Portage normally sets these
300     # environment variables in emerge_main, but we can't use that function,
301     # because it also does a bunch of other stuff that we don't want.
302     # TODO(davidjames): Patch portage to move this logic into a function we can
303     # reuse here.
304     if "--debug" in opts:
305       os.environ["PORTAGE_DEBUG"] = "1"
306     if "--config-root" in opts:
307       os.environ["PORTAGE_CONFIGROOT"] = opts["--config-root"]
308     if "--root" in opts:
309       os.environ["ROOT"] = opts["--root"]
310     if "--accept-properties" in opts:
311       os.environ["ACCEPT_PROPERTIES"] = opts["--accept-properties"]
312
313     # If we're installing packages to the board, we can disable vardb locks.
314     # This is safe because we only run up to one instance of parallel_emerge in
315     # parallel.
316     # TODO(davidjames): Enable this for the host too.
317     if self.board:
318       os.environ.setdefault("PORTAGE_LOCKS", "false")
319
320     # Now that we've setup the necessary environment variables, we can load the
321     # emerge config from disk.
322     settings, trees, mtimedb = load_emerge_config()
323
324     # Add in EMERGE_DEFAULT_OPTS, if specified.
325     tmpcmdline = []
326     if "--ignore-default-opts" not in opts:
327       tmpcmdline.extend(settings["EMERGE_DEFAULT_OPTS"].split())
328     tmpcmdline.extend(emerge_args)
329     action, opts, cmdline_packages = parse_opts(tmpcmdline)
330
331     # If we're installing to the board, we want the --root-deps option so that
332     # portage will install the build dependencies to that location as well.
333     if self.board:
334       opts.setdefault("--root-deps", True)
335
336     # Check whether our portage tree is out of date. Typically, this happens
337     # when you're setting up a new portage tree, such as in setup_board and
338     # make_chroot. In that case, portage applies a bunch of global updates
339     # here. Once the updates are finished, we need to commit any changes
340     # that the global update made to our mtimedb, and reload the config.
341     #
342     # Portage normally handles this logic in emerge_main, but again, we can't
343     # use that function here.
344     if _global_updates(trees, mtimedb["updates"]):
345       mtimedb.commit()
346       settings, trees, mtimedb = load_emerge_config(trees=trees)
347
348     # Setup implied options. Portage normally handles this logic in
349     # emerge_main.
350     if "--buildpkgonly" in opts or "buildpkg" in settings.features:
351       opts.setdefault("--buildpkg", True)
352     if "--getbinpkgonly" in opts:
353       opts.setdefault("--usepkgonly", True)
354       opts.setdefault("--getbinpkg", True)
355     if "getbinpkg" in settings.features:
356       # Per emerge_main, FEATURES=getbinpkg overrides --getbinpkg=n
357       opts["--getbinpkg"] = True
358     if "--getbinpkg" in opts or "--usepkgonly" in opts:
359       opts.setdefault("--usepkg", True)
360     if "--fetch-all-uri" in opts:
361       opts.setdefault("--fetchonly", True)
362     if "--skipfirst" in opts:
363       opts.setdefault("--resume", True)
364     if "--buildpkgonly" in opts:
365       # --buildpkgonly will not merge anything, so it overrides all binary
366       # package options.
367       for opt in ("--getbinpkg", "--getbinpkgonly",
368                   "--usepkg", "--usepkgonly"):
369         opts.pop(opt, None)
370     if (settings.get("PORTAGE_DEBUG", "") == "1" and
371         "python-trace" in settings.features):
372       portage.debug.set_trace(True)
373
374     # Complain about unsupported options
375     for opt in ("--ask", "--ask-enter-invalid", "--resume", "--skipfirst"):
376       if opt in opts:
377         print "%s is not supported by parallel_emerge" % opt
378         sys.exit(1)
379
380     # Make emerge specific adjustments to the config (e.g. colors!)
381     adjust_configs(opts, trees)
382
383     # Save our configuration so far in the emerge object
384     emerge = self.emerge
385     emerge.action, emerge.opts = action, opts
386     emerge.settings, emerge.trees, emerge.mtimedb = settings, trees, mtimedb
387     emerge.cmdline_packages = cmdline_packages
388     root = settings["ROOT"]
389     emerge.root_config = trees[root]["root_config"]
390
391     if "--usepkg" in opts:
392       emerge.trees[root]["bintree"].populate("--getbinpkg" in opts)
393
394   def CreateDepgraph(self, emerge, packages):
395     """Create an emerge depgraph object."""
396     # Setup emerge options.
397     emerge_opts = emerge.opts.copy()
398
399     # Ask portage to build a dependency graph. with the options we specified
400     # above.
401     params = create_depgraph_params(emerge_opts, emerge.action)
402     success, depgraph, favorites = backtrack_depgraph(
403         emerge.settings, emerge.trees, emerge_opts, params, emerge.action,
404         packages, emerge.spinner)
405     emerge.depgraph = depgraph
406
407     # Is it impossible to honor the user's request? Bail!
408     if not success:
409       depgraph.display_problems()
410       sys.exit(1)
411
412     emerge.depgraph = depgraph
413     emerge.favorites = favorites
414
415     # Prime and flush emerge caches.
416     root = emerge.settings["ROOT"]
417     vardb = emerge.trees[root]["vartree"].dbapi
418     if "--pretend" not in emerge.opts:
419       vardb.counter_tick()
420     vardb.flush_cache()
421
422   def GenDependencyTree(self):
423     """Get dependency tree info from emerge.
424
425     Returns:
426       Dependency tree
427     """
428     start = time.time()
429
430     emerge = self.emerge
431
432     # Create a list of packages to merge
433     packages = set(emerge.cmdline_packages[:])
434
435     # Tell emerge to be quiet. We print plenty of info ourselves so we don't
436     # need any extra output from portage.
437     portage.util.noiselimit = -1
438
439     # My favorite feature: The silent spinner. It doesn't spin. Ever.
440     # I'd disable the colors by default too, but they look kind of cool.
441     emerge.spinner = stdout_spinner()
442     emerge.spinner.update = emerge.spinner.update_quiet
443
444     if "--quiet" not in emerge.opts:
445       print "Calculating deps..."
446
447     self.CreateDepgraph(emerge, packages)
448     depgraph = emerge.depgraph
449
450     # Build our own tree from the emerge digraph.
451     deps_tree = {}
452     # pylint: disable=W0212
453     digraph = depgraph._dynamic_config.digraph
454     root = emerge.settings["ROOT"]
455     final_db = depgraph._dynamic_config.mydbapi[root]
456     for node, node_deps in digraph.nodes.items():
457       # Calculate dependency packages that need to be installed first. Each
458       # child on the digraph is a dependency. The "operation" field specifies
459       # what we're doing (e.g. merge, uninstall, etc.). The "priorities" array
460       # contains the type of dependency (e.g. build, runtime, runtime_post,
461       # etc.)
462       #
463       # Portage refers to the identifiers for packages as a CPV. This acronym
464       # stands for Component/Path/Version.
465       #
466       # Here's an example CPV: chromeos-base/power_manager-0.0.1-r1
467       # Split up, this CPV would be:
468       #   C -- Component: chromeos-base
469       #   P -- Path:      power_manager
470       #   V -- Version:   0.0.1-r1
471       #
472       # We just refer to CPVs as packages here because it's easier.
473       deps = {}
474       for child, priorities in node_deps[0].items():
475         if isinstance(child, Package) and child.root == root:
476           cpv = str(child.cpv)
477           action = str(child.operation)
478
479           # If we're uninstalling a package, check whether Portage is
480           # installing a replacement. If so, just depend on the installation
481           # of the new package, because the old package will automatically
482           # be uninstalled at that time.
483           if action == "uninstall":
484             for pkg in final_db.match_pkgs(child.slot_atom):
485               cpv = str(pkg.cpv)
486               action = "merge"
487               break
488
489           deps[cpv] = dict(action=action,
490                            deptypes=[str(x) for x in priorities],
491                            deps={})
492
493       # We've built our list of deps, so we can add our package to the tree.
494       if isinstance(node, Package) and node.root == root:
495         deps_tree[str(node.cpv)] = dict(action=str(node.operation),
496                                         deps=deps)
497
498     # Ask portage for its install plan, so that we can only throw out
499     # dependencies that portage throws out.
500     deps_info = {}
501     for pkg in depgraph.altlist():
502       if isinstance(pkg, Package):
503         assert pkg.root == root
504         self.package_db[pkg.cpv] = pkg
505
506         # Save off info about the package
507         deps_info[str(pkg.cpv)] = {"idx": len(deps_info)}
508
509     seconds = time.time() - start
510     if "--quiet" not in emerge.opts:
511       print "Deps calculated in %dm%.1fs" % (seconds / 60, seconds % 60)
512
513     return deps_tree, deps_info
514
515   def PrintTree(self, deps, depth=""):
516     """Print the deps we have seen in the emerge output.
517
518     Args:
519      deps: Dependency tree structure.
520      depth: Allows printing the tree recursively, with indentation.
521     """
522     for entry in sorted(deps):
523       action = deps[entry]["action"]
524       print "%s %s (%s)" % (depth, entry, action)
525       self.PrintTree(deps[entry]["deps"], depth=depth + "  ")
526
527   def GenDependencyGraph(self, deps_tree, deps_info):
528     """Generate a doubly linked dependency graph.
529
530     Args:
531       deps_tree: Dependency tree structure.
532       deps_info: More details on the dependencies.
533
534     Returns:
535       Deps graph in the form of a dict of packages, with each package
536       specifying a "needs" list and "provides" list.
537     """
538     emerge = self.emerge
539
540     # deps_map is the actual dependency graph.
541     #
542     # Each package specifies a "needs" list and a "provides" list. The "needs"
543     # list indicates which packages we depend on. The "provides" list
544     # indicates the reverse dependencies -- what packages need us.
545     #
546     # We also provide some other information in the dependency graph:
547     #  - action: What we're planning on doing with this package. Generally,
548     #            "merge", "nomerge", or "uninstall"
549     deps_map = {}
550
551     def ReverseTree(packages):
552       """Convert tree to digraph.
553
554       Take the tree of package -> requirements and reverse it to a digraph of
555       buildable packages -> packages they unblock.
556
557       Args:
558         packages: Tree(s) of dependencies.
559
560       Returns:
561         Unsanitized digraph.
562       """
563       binpkg_phases = set(["setup", "preinst", "postinst"])
564       needed_dep_types = set(["blocker", "buildtime", "runtime"])
565       for pkg in packages:
566
567         # Create an entry for the package
568         action = packages[pkg]["action"]
569         default_pkg = {"needs": {}, "provides": set(), "action": action,
570                        "nodeps": False, "binary": False}
571         this_pkg = deps_map.setdefault(pkg, default_pkg)
572
573         if pkg in deps_info:
574           this_pkg["idx"] = deps_info[pkg]["idx"]
575
576         # If a package doesn't have any defined phases that might use the
577         # dependent packages (i.e. pkg_setup, pkg_preinst, or pkg_postinst),
578         # we can install this package before its deps are ready.
579         emerge_pkg = self.package_db.get(pkg)
580         if emerge_pkg and emerge_pkg.type_name == "binary":
581           this_pkg["binary"] = True
582           if 0 <= vercmp(portage.VERSION, "2.1.11.50"):
583             defined_phases = emerge_pkg.defined_phases
584           else:
585             defined_phases = emerge_pkg.metadata.defined_phases
586           defined_binpkg_phases = binpkg_phases.intersection(defined_phases)
587           if not defined_binpkg_phases:
588             this_pkg["nodeps"] = True
589
590         # Create entries for dependencies of this package first.
591         ReverseTree(packages[pkg]["deps"])
592
593         # Add dependencies to this package.
594         for dep, dep_item in packages[pkg]["deps"].iteritems():
595           # We only need to enforce strict ordering of dependencies if the
596           # dependency is a blocker, or is a buildtime or runtime dependency.
597           # (I.e., ignored, optional, and runtime_post dependencies don't
598           # depend on ordering.)
599           dep_types = dep_item["deptypes"]
600           if needed_dep_types.intersection(dep_types):
601             deps_map[dep]["provides"].add(pkg)
602             this_pkg["needs"][dep] = "/".join(dep_types)
603
604           # If there's a blocker, Portage may need to move files from one
605           # package to another, which requires editing the CONTENTS files of
606           # both packages. To avoid race conditions while editing this file,
607           # the two packages must not be installed in parallel, so we can't
608           # safely ignore dependencies. See http://crosbug.com/19328
609           if "blocker" in dep_types:
610             this_pkg["nodeps"] = False
611
612     def FindCycles():
613       """Find cycles in the dependency tree.
614
615       Returns:
616         A dict mapping cyclic packages to a dict of the deps that cause
617         cycles. For each dep that causes cycles, it returns an example
618         traversal of the graph that shows the cycle.
619       """
620
621       def FindCyclesAtNode(pkg, cycles, unresolved, resolved):
622         """Find cycles in cyclic dependencies starting at specified package.
623
624         Args:
625           pkg: Package identifier.
626           cycles: A dict mapping cyclic packages to a dict of the deps that
627                   cause cycles. For each dep that causes cycles, it returns an
628                   example traversal of the graph that shows the cycle.
629           unresolved: Nodes that have been visited but are not fully processed.
630           resolved: Nodes that have been visited and are fully processed.
631         """
632         pkg_cycles = cycles.get(pkg)
633         if pkg in resolved and not pkg_cycles:
634           # If we already looked at this package, and found no cyclic
635           # dependencies, we can stop now.
636           return
637         unresolved.append(pkg)
638         for dep in deps_map[pkg]["needs"]:
639           if dep in unresolved:
640             idx = unresolved.index(dep)
641             mycycle = unresolved[idx:] + [dep]
642             for i in xrange(len(mycycle) - 1):
643               pkg1, pkg2 = mycycle[i], mycycle[i+1]
644               cycles.setdefault(pkg1, {}).setdefault(pkg2, mycycle)
645           elif not pkg_cycles or dep not in pkg_cycles:
646             # Looks like we haven't seen this edge before.
647             FindCyclesAtNode(dep, cycles, unresolved, resolved)
648         unresolved.pop()
649         resolved.add(pkg)
650
651       cycles, unresolved, resolved = {}, [], set()
652       for pkg in deps_map:
653         FindCyclesAtNode(pkg, cycles, unresolved, resolved)
654       return cycles
655
656     def RemoveUnusedPackages():
657       """Remove installed packages, propagating dependencies."""
658       # Schedule packages that aren't on the install list for removal
659       rm_pkgs = set(deps_map.keys()) - set(deps_info.keys())
660
661       # Remove the packages we don't want, simplifying the graph and making
662       # it easier for us to crack cycles.
663       for pkg in sorted(rm_pkgs):
664         this_pkg = deps_map[pkg]
665         needs = this_pkg["needs"]
666         provides = this_pkg["provides"]
667         for dep in needs:
668           dep_provides = deps_map[dep]["provides"]
669           dep_provides.update(provides)
670           dep_provides.discard(pkg)
671           dep_provides.discard(dep)
672         for target in provides:
673           target_needs = deps_map[target]["needs"]
674           target_needs.update(needs)
675           target_needs.pop(pkg, None)
676           target_needs.pop(target, None)
677         del deps_map[pkg]
678
679     def PrintCycleBreak(basedep, dep, mycycle):
680       """Print details about a cycle that we are planning on breaking.
681
682       We are breaking a cycle where dep needs basedep. mycycle is an
683       example cycle which contains dep -> basedep.
684       """
685
686       needs = deps_map[dep]["needs"]
687       depinfo = needs.get(basedep, "deleted")
688
689       # It's OK to swap install order for blockers, as long as the two
690       # packages aren't installed in parallel. If there is a cycle, then
691       # we know the packages depend on each other already, so we can drop the
692       # blocker safely without printing a warning.
693       if depinfo == "blocker":
694         return
695
696       # Notify the user that we're breaking a cycle.
697       print "Breaking %s -> %s (%s)" % (dep, basedep, depinfo)
698
699       # Show cycle.
700       for i in xrange(len(mycycle) - 1):
701         pkg1, pkg2 = mycycle[i], mycycle[i+1]
702         needs = deps_map[pkg1]["needs"]
703         depinfo = needs.get(pkg2, "deleted")
704         if pkg1 == dep and pkg2 == basedep:
705           depinfo = depinfo + ", deleting"
706         print "  %s -> %s (%s)" % (pkg1, pkg2, depinfo)
707
708     def SanitizeTree():
709       """Remove circular dependencies.
710
711       We prune all dependencies involved in cycles that go against the emerge
712       ordering. This has a nice property: we're guaranteed to merge
713       dependencies in the same order that portage does.
714
715       Because we don't treat any dependencies as "soft" unless they're killed
716       by a cycle, we pay attention to a larger number of dependencies when
717       merging. This hurts performance a bit, but helps reliability.
718       """
719       start = time.time()
720       cycles = FindCycles()
721       while cycles:
722         for dep, mycycles in cycles.iteritems():
723           for basedep, mycycle in mycycles.iteritems():
724             if deps_info[basedep]["idx"] >= deps_info[dep]["idx"]:
725               if "--quiet" not in emerge.opts:
726                 PrintCycleBreak(basedep, dep, mycycle)
727               del deps_map[dep]["needs"][basedep]
728               deps_map[basedep]["provides"].remove(dep)
729         cycles = FindCycles()
730       seconds = time.time() - start
731       if "--quiet" not in emerge.opts and seconds >= 0.1:
732         print "Tree sanitized in %dm%.1fs" % (seconds / 60, seconds % 60)
733
734     def FindRecursiveProvides(pkg, seen):
735       """Find all nodes that require a particular package.
736
737       Assumes that graph is acyclic.
738
739       Args:
740         pkg: Package identifier.
741         seen: Nodes that have been visited so far.
742       """
743       if pkg in seen:
744         return
745       seen.add(pkg)
746       info = deps_map[pkg]
747       info["tprovides"] = info["provides"].copy()
748       for dep in info["provides"]:
749         FindRecursiveProvides(dep, seen)
750         info["tprovides"].update(deps_map[dep]["tprovides"])
751
752     ReverseTree(deps_tree)
753
754     # We need to remove unused packages so that we can use the dependency
755     # ordering of the install process to show us what cycles to crack.
756     RemoveUnusedPackages()
757     SanitizeTree()
758     seen = set()
759     for pkg in deps_map:
760       FindRecursiveProvides(pkg, seen)
761     return deps_map
762
763   def PrintInstallPlan(self, deps_map):
764     """Print an emerge-style install plan.
765
766     The install plan lists what packages we're installing, in order.
767     It's useful for understanding what parallel_emerge is doing.
768
769     Args:
770       deps_map: The dependency graph.
771     """
772
773     def InstallPlanAtNode(target, deps_map):
774       nodes = []
775       nodes.append(target)
776       for dep in deps_map[target]["provides"]:
777         del deps_map[dep]["needs"][target]
778         if not deps_map[dep]["needs"]:
779           nodes.extend(InstallPlanAtNode(dep, deps_map))
780       return nodes
781
782     deps_map = copy.deepcopy(deps_map)
783     install_plan = []
784     plan = set()
785     for target, info in deps_map.iteritems():
786       if not info["needs"] and target not in plan:
787         for item in InstallPlanAtNode(target, deps_map):
788           plan.add(item)
789           install_plan.append(self.package_db[item])
790
791     for pkg in plan:
792       del deps_map[pkg]
793
794     if deps_map:
795       print "Cyclic dependencies:", " ".join(deps_map)
796       PrintDepsMap(deps_map)
797       sys.exit(1)
798
799     self.emerge.depgraph.display(install_plan)
800
801
802 def PrintDepsMap(deps_map):
803   """Print dependency graph, for each package list it's prerequisites."""
804   for i in sorted(deps_map):
805     print "%s: (%s) needs" % (i, deps_map[i]["action"])
806     needs = deps_map[i]["needs"]
807     for j in sorted(needs):
808       print "    %s" % (j)
809     if not needs:
810       print "    no dependencies"
811
812
813 class EmergeJobState(object):
814   """Structure describing the EmergeJobState."""
815
816   __slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
817                "last_output_timestamp", "pkgname", "retcode", "start_timestamp",
818                "target", "fetch_only", "unpack_only"]
819
820   def __init__(self, target, pkgname, done, filename, start_timestamp,
821                retcode=None, fetch_only=False, unpack_only=False):
822
823     # The full name of the target we're building (e.g.
824     # virtual/target-os-1-r60)
825     self.target = target
826
827     # The short name of the target we're building (e.g. target-os-1-r60)
828     self.pkgname = pkgname
829
830     # Whether the job is done. (True if the job is done; false otherwise.)
831     self.done = done
832
833     # The filename where output is currently stored.
834     self.filename = filename
835
836     # The timestamp of the last time we printed the name of the log file. We
837     # print this at the beginning of the job, so this starts at
838     # start_timestamp.
839     self.last_notify_timestamp = start_timestamp
840
841     # The location (in bytes) of the end of the last complete line we printed.
842     # This starts off at zero. We use this to jump to the right place when we
843     # print output from the same ebuild multiple times.
844     self.last_output_seek = 0
845
846     # The timestamp of the last time we printed output. Since we haven't
847     # printed output yet, this starts at zero.
848     self.last_output_timestamp = 0
849
850     # The return code of our job, if the job is actually finished.
851     self.retcode = retcode
852
853     # Was this just a fetch job?
854     self.fetch_only = fetch_only
855
856     # The timestamp when our job started.
857     self.start_timestamp = start_timestamp
858
859     # No emerge, only unpack packages.
860     self.unpack_only = unpack_only
861
862
863 def KillHandler(_signum, _frame):
864   # Kill self and all subprocesses.
865   os.killpg(0, signal.SIGKILL)
866
867 def SetupWorkerSignals():
868   def ExitHandler(_signum, _frame):
869     # Set KILLED flag.
870     KILLED.set()
871
872     # Remove our signal handlers so we don't get called recursively.
873     signal.signal(signal.SIGINT, KillHandler)
874     signal.signal(signal.SIGTERM, KillHandler)
875
876   # Ensure that we exit quietly and cleanly, if possible, when we receive
877   # SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all
878   # of the child processes will print details about KeyboardInterrupt
879   # exceptions, which isn't very helpful.
880   signal.signal(signal.SIGINT, ExitHandler)
881   signal.signal(signal.SIGTERM, ExitHandler)
882
883 def EmergeProcess(output, *args, **kwargs):
884   """Merge a package in a subprocess.
885
886   Args:
887     output: Temporary file to write output.
888     *args: Arguments to pass to Scheduler constructor.
889     **kwargs: Keyword arguments to pass to Scheduler constructor.
890
891   Returns:
892     The exit code returned by the subprocess.
893   """
894   pid = os.fork()
895   if pid == 0:
896     try:
897       # Sanity checks.
898       if sys.stdout.fileno() != 1:
899         raise Exception("sys.stdout.fileno() != 1")
900       if sys.stderr.fileno() != 2:
901         raise Exception("sys.stderr.fileno() != 2")
902
903       # - Redirect 1 (stdout) and 2 (stderr) at our temporary file.
904       # - Redirect 0 to point at sys.stdin. In this case, sys.stdin
905       #   points at a file reading os.devnull, because multiprocessing mucks
906       #   with sys.stdin.
907       # - Leave the sys.stdin and output filehandles alone.
908       fd_pipes = {0: sys.stdin.fileno(),
909                   1: output.fileno(),
910                   2: output.fileno(),
911                   sys.stdin.fileno(): sys.stdin.fileno(),
912                   output.fileno(): output.fileno()}
913       if 0 <= vercmp(portage.VERSION, "2.1.11.50"):
914         # pylint: disable=W0212
915         portage.process._setup_pipes(fd_pipes, close_fds=False)
916       else:
917         # pylint: disable=W0212
918         portage.process._setup_pipes(fd_pipes)
919
920       # Portage doesn't like when sys.stdin.fileno() != 0, so point sys.stdin
921       # at the filehandle we just created in _setup_pipes.
922       if sys.stdin.fileno() != 0:
923         sys.__stdin__ = sys.stdin = os.fdopen(0, "r")
924
925       scheduler = Scheduler(*args, **kwargs)
926
927       # Enable blocker handling even though we're in --nodeps mode. This
928       # allows us to unmerge the blocker after we've merged the replacement.
929       scheduler._opts_ignore_blockers = frozenset()
930
931       # Actually do the merge.
932       retval = scheduler.merge()
933
934     # We catch all exceptions here (including SystemExit, KeyboardInterrupt,
935     # etc) so as to ensure that we don't confuse the multiprocessing module,
936     # which expects that all forked children exit with os._exit().
937     # pylint: disable=W0702
938     except:
939       traceback.print_exc(file=output)
940       retval = 1
941     sys.stdout.flush()
942     sys.stderr.flush()
943     output.flush()
944     # pylint: disable=W0212
945     os._exit(retval)
946   else:
947     # Return the exit code of the subprocess.
948     return os.waitpid(pid, 0)[1]
949
950
951 def UnpackPackage(pkg_state):
952   """Unpacks package described by pkg_state.
953
954   Args:
955     pkg_state: EmergeJobState object describing target.
956
957   Returns:
958     Exit code returned by subprocess.
959   """
960   pkgdir = os.environ.get("PKGDIR",
961                           os.path.join(os.environ["SYSROOT"], "packages"))
962   root = os.environ.get("ROOT", os.environ["SYSROOT"])
963   path = os.path.join(pkgdir, pkg_state.target + ".tbz2")
964   comp = cros_build_lib.FindCompressor(cros_build_lib.COMP_BZIP2)
965   cmd = [comp, "-dc"]
966   if comp.endswith("pbzip2"):
967     cmd.append("--ignore-trailing-garbage=1")
968   cmd.append(path)
969
970   result = cros_build_lib.RunCommand(cmd, cwd=root, stdout_to_pipe=True,
971                                      print_cmd=False, error_code_ok=True)
972
973   # If we were not successful, return now and don't attempt untar.
974   if result.returncode:
975     return result.returncode
976
977   cmd = ["sudo", "tar", "-xf", "-", "-C", root]
978   result = cros_build_lib.RunCommand(cmd, cwd=root, input=result.output,
979                                      print_cmd=False, error_code_ok=True)
980
981   return result.returncode
982
983
984 def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False,
985                  unpack_only=False):
986   """This worker emerges any packages given to it on the task_queue.
987
988   Args:
989     task_queue: The queue of tasks for this worker to do.
990     job_queue: The queue of results from the worker.
991     emerge: An EmergeData() object.
992     package_db: A dict, mapping package ids to portage Package objects.
993     fetch_only: A bool, indicating if we should just fetch the target.
994     unpack_only: A bool, indicating if we should just unpack the target.
995
996   It expects package identifiers to be passed to it via task_queue. When
997   a task is started, it pushes the (target, filename) to the started_queue.
998   The output is stored in filename. When a merge starts or finishes, we push
999   EmergeJobState objects to the job_queue.
1000   """
1001
1002   SetupWorkerSignals()
1003   settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb
1004
1005   # Disable flushing of caches to save on I/O.
1006   root = emerge.settings["ROOT"]
1007   vardb = emerge.trees[root]["vartree"].dbapi
1008   vardb._flush_cache_enabled = False
1009   bindb = emerge.trees[root]["bintree"].dbapi
1010   # Might be a set, might be a list, might be None; no clue, just use shallow
1011   # copy to ensure we can roll it back.
1012   # pylint: disable=W0212
1013   original_remotepkgs = copy.copy(bindb.bintree._remotepkgs)
1014
1015   opts, spinner = emerge.opts, emerge.spinner
1016   opts["--nodeps"] = True
1017   if fetch_only:
1018     opts["--fetchonly"] = True
1019
1020   while True:
1021     # Wait for a new item to show up on the queue. This is a blocking wait,
1022     # so if there's nothing to do, we just sit here.
1023     pkg_state = task_queue.get()
1024     if pkg_state is None:
1025       # If target is None, this means that the main thread wants us to quit.
1026       # The other workers need to exit too, so we'll push the message back on
1027       # to the queue so they'll get it too.
1028       task_queue.put(None)
1029       return
1030     if KILLED.is_set():
1031       return
1032
1033     target = pkg_state.target
1034
1035     db_pkg = package_db[target]
1036
1037     if db_pkg.type_name == "binary":
1038       if not fetch_only and pkg_state.fetched_successfully:
1039         # Ensure portage doesn't think our pkg is remote- else it'll force
1040         # a redownload of it (even if the on-disk file is fine).  In-memory
1041         # caching basically, implemented dumbly.
1042         bindb.bintree._remotepkgs = None
1043     else:
1044       bindb.bintree_remotepkgs = original_remotepkgs
1045
1046     db_pkg.root_config = emerge.root_config
1047     install_list = [db_pkg]
1048     pkgname = db_pkg.pf
1049     output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False)
1050     os.chmod(output.name, 644)
1051     start_timestamp = time.time()
1052     job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
1053                          fetch_only=fetch_only, unpack_only=unpack_only)
1054     job_queue.put(job)
1055     if "--pretend" in opts:
1056       retcode = 0
1057     else:
1058       try:
1059         emerge.scheduler_graph.mergelist = install_list
1060         if unpack_only:
1061           retcode = UnpackPackage(pkg_state)
1062         else:
1063           retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
1064                                   spinner, favorites=emerge.favorites,
1065                                   graph_config=emerge.scheduler_graph)
1066       except Exception:
1067         traceback.print_exc(file=output)
1068         retcode = 1
1069       output.close()
1070
1071     if KILLED.is_set():
1072       return
1073
1074     job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
1075                          retcode, fetch_only=fetch_only,
1076                          unpack_only=unpack_only)
1077     job_queue.put(job)
1078
1079
1080 class LinePrinter(object):
1081   """Helper object to print a single line."""
1082
1083   def __init__(self, line):
1084     self.line = line
1085
1086   def Print(self, _seek_locations):
1087     print self.line
1088
1089
1090 class JobPrinter(object):
1091   """Helper object to print output of a job."""
1092
1093   def __init__(self, job, unlink=False):
1094     """Print output of job.
1095
1096     If unlink is True, unlink the job output file when done.
1097     """
1098     self.current_time = time.time()
1099     self.job = job
1100     self.unlink = unlink
1101
1102   def Print(self, seek_locations):
1103
1104     job = self.job
1105
1106     # Calculate how long the job has been running.
1107     seconds = self.current_time - job.start_timestamp
1108
1109     # Note that we've printed out the job so far.
1110     job.last_output_timestamp = self.current_time
1111
1112     # Note that we're starting the job
1113     info = "job %s (%dm%.1fs)" % (job.pkgname, seconds / 60, seconds % 60)
1114     last_output_seek = seek_locations.get(job.filename, 0)
1115     if last_output_seek:
1116       print "=== Continue output for %s ===" % info
1117     else:
1118       print "=== Start output for %s ===" % info
1119
1120     # Print actual output from job
1121     f = codecs.open(job.filename, encoding='utf-8', errors='replace')
1122     f.seek(last_output_seek)
1123     prefix = job.pkgname + ":"
1124     for line in f:
1125
1126       # Save off our position in the file
1127       if line and line[-1] == "\n":
1128         last_output_seek = f.tell()
1129         line = line[:-1]
1130
1131       # Print our line
1132       print prefix, line.encode('utf-8', 'replace')
1133     f.close()
1134
1135     # Save our last spot in the file so that we don't print out the same
1136     # location twice.
1137     seek_locations[job.filename] = last_output_seek
1138
1139     # Note end of output section
1140     if job.done:
1141       print "=== Complete: %s ===" % info
1142     else:
1143       print "=== Still running: %s ===" % info
1144
1145     if self.unlink:
1146       os.unlink(job.filename)
1147
1148
1149 def PrintWorker(queue):
1150   """A worker that prints stuff to the screen as requested."""
1151
1152   def ExitHandler(_signum, _frame):
1153     # Set KILLED flag.
1154     KILLED.set()
1155
1156     # Switch to default signal handlers so that we'll die after two signals.
1157     signal.signal(signal.SIGINT, KillHandler)
1158     signal.signal(signal.SIGTERM, KillHandler)
1159
1160   # Don't exit on the first SIGINT / SIGTERM, because the parent worker will
1161   # handle it and tell us when we need to exit.
1162   signal.signal(signal.SIGINT, ExitHandler)
1163   signal.signal(signal.SIGTERM, ExitHandler)
1164
1165   # seek_locations is a map indicating the position we are at in each file.
1166   # It starts off empty, but is set by the various Print jobs as we go along
1167   # to indicate where we left off in each file.
1168   seek_locations = {}
1169   while True:
1170     try:
1171       job = queue.get()
1172       if job:
1173         job.Print(seek_locations)
1174         sys.stdout.flush()
1175       else:
1176         break
1177     except IOError as ex:
1178       if ex.errno == errno.EINTR:
1179         # Looks like we received a signal. Keep printing.
1180         continue
1181       raise
1182
1183
1184 class TargetState(object):
1185   """Structure descriting the TargetState."""
1186
1187   __slots__ = ("target", "info", "score", "prefetched", "fetched_successfully")
1188
1189   def __init__(self, target, info):
1190     self.target, self.info = target, info
1191     self.fetched_successfully = False
1192     self.prefetched = False
1193     self.score = None
1194     self.update_score()
1195
1196   def __cmp__(self, other):
1197     return cmp(self.score, other.score)
1198
1199   def update_score(self):
1200     self.score = (
1201         -len(self.info["tprovides"]),
1202         len(self.info["needs"]),
1203         not self.info["binary"],
1204         -len(self.info["provides"]),
1205         self.info["idx"],
1206         self.target,
1207         )
1208
1209
1210 class ScoredHeap(object):
1211   """Implementation of a general purpose scored heap."""
1212
1213   __slots__ = ("heap", "_heap_set")
1214
1215   def __init__(self, initial=()):
1216     self.heap = list()
1217     self._heap_set = set()
1218     if initial:
1219       self.multi_put(initial)
1220
1221   def get(self):
1222     item = heapq.heappop(self.heap)
1223     self._heap_set.remove(item.target)
1224     return item
1225
1226   def put(self, item):
1227     if not isinstance(item, TargetState):
1228       raise ValueError("Item %r isn't a TargetState" % (item,))
1229     heapq.heappush(self.heap, item)
1230     self._heap_set.add(item.target)
1231
1232   def multi_put(self, sequence):
1233     sequence = list(sequence)
1234     self.heap.extend(sequence)
1235     self._heap_set.update(x.target for x in sequence)
1236     self.sort()
1237
1238   def sort(self):
1239     heapq.heapify(self.heap)
1240
1241   def __contains__(self, target):
1242     return target in self._heap_set
1243
1244   def __nonzero__(self):
1245     return bool(self.heap)
1246
1247   def __len__(self):
1248     return len(self.heap)
1249
1250
1251 class EmergeQueue(object):
1252   """Class to schedule emerge jobs according to a dependency graph."""
1253
1254   def __init__(self, deps_map, emerge, package_db, show_output, unpack_only):
1255     # Store the dependency graph.
1256     self._deps_map = deps_map
1257     self._state_map = {}
1258     # Initialize the running queue to empty
1259     self._build_jobs = {}
1260     self._build_ready = ScoredHeap()
1261     self._fetch_jobs = {}
1262     self._fetch_ready = ScoredHeap()
1263     self._unpack_jobs = {}
1264     self._unpack_ready = ScoredHeap()
1265     # List of total package installs represented in deps_map.
1266     install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
1267     self._total_jobs = len(install_jobs)
1268     self._show_output = show_output
1269     self._unpack_only = unpack_only
1270
1271     if "--pretend" in emerge.opts:
1272       print "Skipping merge because of --pretend mode."
1273       sys.exit(0)
1274
1275     # Set a process group so we can easily terminate all children.
1276     os.setsid()
1277
1278     # Setup scheduler graph object. This is used by the child processes
1279     # to help schedule jobs.
1280     emerge.scheduler_graph = emerge.depgraph.schedulerGraph()
1281
1282     # Calculate how many jobs we can run in parallel. We don't want to pass
1283     # the --jobs flag over to emerge itself, because that'll tell emerge to
1284     # hide its output, and said output is quite useful for debugging hung
1285     # jobs.
1286     procs = min(self._total_jobs,
1287                 emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
1288     self._build_procs = self._unpack_procs = self._fetch_procs = max(1, procs)
1289     self._load_avg = emerge.opts.pop("--load-average", None)
1290     self._job_queue = multiprocessing.Queue()
1291     self._print_queue = multiprocessing.Queue()
1292
1293     self._fetch_queue = multiprocessing.Queue()
1294     args = (self._fetch_queue, self._job_queue, emerge, package_db, True)
1295     self._fetch_pool = multiprocessing.Pool(self._fetch_procs, EmergeWorker,
1296                                             args)
1297
1298     self._build_queue = multiprocessing.Queue()
1299     args = (self._build_queue, self._job_queue, emerge, package_db)
1300     self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
1301                                             args)
1302
1303     if self._unpack_only:
1304       # Unpack pool only required on unpack_only jobs.
1305       self._unpack_queue = multiprocessing.Queue()
1306       args = (self._unpack_queue, self._job_queue, emerge, package_db, False,
1307               True)
1308       self._unpack_pool = multiprocessing.Pool(self._unpack_procs, EmergeWorker,
1309                                                args)
1310
1311     self._print_worker = multiprocessing.Process(target=PrintWorker,
1312                                                  args=[self._print_queue])
1313     self._print_worker.start()
1314
1315     # Initialize the failed queue to empty.
1316     self._retry_queue = []
1317     self._failed = set()
1318
1319     # Setup an exit handler so that we print nice messages if we are
1320     # terminated.
1321     self._SetupExitHandler()
1322
1323     # Schedule our jobs.
1324     self._state_map.update(
1325         (pkg, TargetState(pkg, data)) for pkg, data in deps_map.iteritems())
1326     self._fetch_ready.multi_put(self._state_map.itervalues())
1327
1328   def _SetupExitHandler(self):
1329
1330     def ExitHandler(signum, _frame):
1331       # Set KILLED flag.
1332       KILLED.set()
1333
1334       # Kill our signal handlers so we don't get called recursively
1335       signal.signal(signal.SIGINT, KillHandler)
1336       signal.signal(signal.SIGTERM, KillHandler)
1337
1338       # Print our current job status
1339       for job in self._build_jobs.itervalues():
1340         if job:
1341           self._print_queue.put(JobPrinter(job, unlink=True))
1342
1343       # Notify the user that we are exiting
1344       self._Print("Exiting on signal %s" % signum)
1345       self._print_queue.put(None)
1346       self._print_worker.join()
1347
1348       # Kill child threads, then exit.
1349       os.killpg(0, signal.SIGKILL)
1350       sys.exit(1)
1351
1352     # Print out job status when we are killed
1353     signal.signal(signal.SIGINT, ExitHandler)
1354     signal.signal(signal.SIGTERM, ExitHandler)
1355
1356   def _ScheduleUnpack(self, pkg_state):
1357     self._unpack_jobs[pkg_state.target] = None
1358     self._unpack_queue.put(pkg_state)
1359
1360   def _Schedule(self, pkg_state):
1361     # We maintain a tree of all deps, if this doesn't need
1362     # to be installed just free up its children and continue.
1363     # It is possible to reinstall deps of deps, without reinstalling
1364     # first level deps, like so:
1365     # virtual/target-os (merge) -> eselect (nomerge) -> python (merge)
1366     this_pkg = pkg_state.info
1367     target = pkg_state.target
1368     if pkg_state.info is not None:
1369       if this_pkg["action"] == "nomerge":
1370         self._Finish(target)
1371       elif target not in self._build_jobs:
1372         # Kick off the build if it's marked to be built.
1373         self._build_jobs[target] = None
1374         self._build_queue.put(pkg_state)
1375         return True
1376
1377   def _ScheduleLoop(self, unpack_only=False):
1378     if unpack_only:
1379       ready_queue = self._unpack_ready
1380       jobs_queue = self._unpack_jobs
1381       procs = self._unpack_procs
1382     else:
1383       ready_queue = self._build_ready
1384       jobs_queue = self._build_jobs
1385       procs = self._build_procs
1386
1387     # If the current load exceeds our desired load average, don't schedule
1388     # more than one job.
1389     if self._load_avg and os.getloadavg()[0] > self._load_avg:
1390       needed_jobs = 1
1391     else:
1392       needed_jobs = procs
1393
1394     # Schedule more jobs.
1395     while ready_queue and len(jobs_queue) < needed_jobs:
1396       state = ready_queue.get()
1397       if unpack_only:
1398         self._ScheduleUnpack(state)
1399       else:
1400         if state.target not in self._failed:
1401           self._Schedule(state)
1402
1403   def _Print(self, line):
1404     """Print a single line."""
1405     self._print_queue.put(LinePrinter(line))
1406
1407   def _Status(self):
1408     """Print status."""
1409     current_time = time.time()
1410     no_output = True
1411
1412     # Print interim output every minute if --show-output is used. Otherwise,
1413     # print notifications about running packages every 2 minutes, and print
1414     # full output for jobs that have been running for 60 minutes or more.
1415     if self._show_output:
1416       interval = 60
1417       notify_interval = 0
1418     else:
1419       interval = 60 * 60
1420       notify_interval = 60 * 2
1421     for job in self._build_jobs.itervalues():
1422       if job:
1423         last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
1424         if last_timestamp + interval < current_time:
1425           self._print_queue.put(JobPrinter(job))
1426           job.last_output_timestamp = current_time
1427           no_output = False
1428         elif (notify_interval and
1429               job.last_notify_timestamp + notify_interval < current_time):
1430           job_seconds = current_time - job.start_timestamp
1431           args = (job.pkgname, job_seconds / 60, job_seconds % 60, job.filename)
1432           info = "Still building %s (%dm%.1fs). Logs in %s" % args
1433           job.last_notify_timestamp = current_time
1434           self._Print(info)
1435           no_output = False
1436
1437     # If we haven't printed any messages yet, print a general status message
1438     # here.
1439     if no_output:
1440       seconds = current_time - GLOBAL_START
1441       fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
1442       ujobs, uready = len(self._unpack_jobs), len(self._unpack_ready)
1443       bjobs, bready = len(self._build_jobs), len(self._build_ready)
1444       retries = len(self._retry_queue)
1445       pending = max(0, len(self._deps_map) - fjobs - bjobs)
1446       line = "Pending %s/%s, " % (pending, self._total_jobs)
1447       if fjobs or fready:
1448         line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
1449       if ujobs or uready:
1450         line += "Unpacking %s/%s, " % (ujobs, uready + ujobs)
1451       if bjobs or bready or retries:
1452         line += "Building %s/%s, " % (bjobs, bready + bjobs)
1453         if retries:
1454           line += "Retrying %s, " % (retries,)
1455       load =  " ".join(str(x) for x in os.getloadavg())
1456       line += ("[Time %dm%.1fs Load %s]" % (seconds/60, seconds %60, load))
1457       self._Print(line)
1458
1459   def _Finish(self, target):
1460     """Mark a target as completed and unblock dependencies."""
1461     this_pkg = self._deps_map[target]
1462     if this_pkg["needs"] and this_pkg["nodeps"]:
1463       # We got installed, but our deps have not been installed yet. Dependent
1464       # packages should only be installed when our needs have been fully met.
1465       this_pkg["action"] = "nomerge"
1466     else:
1467       for dep in this_pkg["provides"]:
1468         dep_pkg = self._deps_map[dep]
1469         state = self._state_map[dep]
1470         del dep_pkg["needs"][target]
1471         state.update_score()
1472         if not state.prefetched:
1473           if dep in self._fetch_ready:
1474             # If it's not currently being fetched, update the prioritization
1475             self._fetch_ready.sort()
1476         elif not dep_pkg["needs"]:
1477           if dep_pkg["nodeps"] and dep_pkg["action"] == "nomerge":
1478             self._Finish(dep)
1479           else:
1480             self._build_ready.put(self._state_map[dep])
1481       self._deps_map.pop(target)
1482
1483   def _Retry(self):
1484     while self._retry_queue:
1485       state = self._retry_queue.pop(0)
1486       if self._Schedule(state):
1487         self._Print("Retrying emerge of %s." % state.target)
1488         break
1489
1490   def _Shutdown(self):
1491     # Tell emerge workers to exit. They all exit when 'None' is pushed
1492     # to the queue.
1493
1494     # Shutdown the workers first; then jobs (which is how they feed things back)
1495     # then finally the print queue.
1496
1497     def _stop(queue, pool):
1498       if pool is None:
1499         return
1500       try:
1501         queue.put(None)
1502         pool.close()
1503         pool.join()
1504       finally:
1505         pool.terminate()
1506
1507     _stop(self._fetch_queue, self._fetch_pool)
1508     self._fetch_queue = self._fetch_pool = None
1509
1510     _stop(self._build_queue, self._build_pool)
1511     self._build_queue = self._build_pool = None
1512
1513     if self._unpack_only:
1514       _stop(self._unpack_queue, self._unpack_pool)
1515       self._unpack_queue = self._unpack_pool = None
1516
1517     if self._job_queue is not None:
1518       self._job_queue.close()
1519       self._job_queue = None
1520
1521     # Now that our workers are finished, we can kill the print queue.
1522     if self._print_worker is not None:
1523       try:
1524         self._print_queue.put(None)
1525         self._print_queue.close()
1526         self._print_worker.join()
1527       finally:
1528         self._print_worker.terminate()
1529     self._print_queue = self._print_worker = None
1530
1531   def Run(self):
1532     """Run through the scheduled ebuilds.
1533
1534     Keep running so long as we have uninstalled packages in the
1535     dependency graph to merge.
1536     """
1537     if not self._deps_map:
1538       return
1539
1540     # Start the fetchers.
1541     for _ in xrange(min(self._fetch_procs, len(self._fetch_ready))):
1542       state = self._fetch_ready.get()
1543       self._fetch_jobs[state.target] = None
1544       self._fetch_queue.put(state)
1545
1546     # Print an update, then get going.
1547     self._Status()
1548
1549     retried = set()
1550     while self._deps_map:
1551       # Check here that we are actually waiting for something.
1552       if (self._build_queue.empty() and
1553           self._job_queue.empty() and
1554           not self._fetch_jobs and
1555           not self._fetch_ready and
1556           not self._unpack_jobs and
1557           not self._unpack_ready and
1558           not self._build_jobs and
1559           not self._build_ready and
1560           self._deps_map):
1561         # If we have failed on a package, retry it now.
1562         if self._retry_queue:
1563           self._Retry()
1564         else:
1565           # Tell the user why we're exiting.
1566           if self._failed:
1567             print 'Packages failed:\n\t%s' % '\n\t'.join(self._failed)
1568             status_file = os.environ.get("PARALLEL_EMERGE_STATUS_FILE")
1569             if status_file:
1570               failed_pkgs = set(portage.versions.cpv_getkey(x)
1571                                 for x in self._failed)
1572               with open(status_file, "a") as f:
1573                 f.write("%s\n" % " ".join(failed_pkgs))
1574           else:
1575             print "Deadlock! Circular dependencies!"
1576           sys.exit(1)
1577
1578       for _ in xrange(12):
1579         try:
1580           job = self._job_queue.get(timeout=5)
1581           break
1582         except Queue.Empty:
1583           # Check if any more jobs can be scheduled.
1584           self._ScheduleLoop()
1585       else:
1586         # Print an update every 60 seconds.
1587         self._Status()
1588         continue
1589
1590       target = job.target
1591
1592       if job.fetch_only:
1593         if not job.done:
1594           self._fetch_jobs[job.target] = job
1595         else:
1596           state = self._state_map[job.target]
1597           state.prefetched = True
1598           state.fetched_successfully = (job.retcode == 0)
1599           del self._fetch_jobs[job.target]
1600           self._Print("Fetched %s in %2.2fs"
1601                       % (target, time.time() - job.start_timestamp))
1602
1603           if self._show_output or job.retcode != 0:
1604             self._print_queue.put(JobPrinter(job, unlink=True))
1605           else:
1606             os.unlink(job.filename)
1607           # Failure or not, let build work with it next.
1608           if not self._deps_map[job.target]["needs"]:
1609             self._build_ready.put(state)
1610             self._ScheduleLoop()
1611
1612           if self._unpack_only and job.retcode == 0:
1613             self._unpack_ready.put(state)
1614             self._ScheduleLoop(unpack_only=True)
1615
1616           if self._fetch_ready:
1617             state = self._fetch_ready.get()
1618             self._fetch_queue.put(state)
1619             self._fetch_jobs[state.target] = None
1620           else:
1621             # Minor optimization; shut down fetchers early since we know
1622             # the queue is empty.
1623             self._fetch_queue.put(None)
1624         continue
1625
1626       if job.unpack_only:
1627         if not job.done:
1628           self._unpack_jobs[target] = job
1629         else:
1630           del self._unpack_jobs[target]
1631           self._Print("Unpacked %s in %2.2fs"
1632                       % (target, time.time() - job.start_timestamp))
1633           if self._show_output or job.retcode != 0:
1634             self._print_queue.put(JobPrinter(job, unlink=True))
1635           else:
1636             os.unlink(job.filename)
1637           if self._unpack_ready:
1638             state = self._unpack_ready.get()
1639             self._unpack_queue.put(state)
1640             self._unpack_jobs[state.target] = None
1641         continue
1642
1643       if not job.done:
1644         self._build_jobs[target] = job
1645         self._Print("Started %s (logged in %s)" % (target, job.filename))
1646         continue
1647
1648       # Print output of job
1649       if self._show_output or job.retcode != 0:
1650         self._print_queue.put(JobPrinter(job, unlink=True))
1651       else:
1652         os.unlink(job.filename)
1653       del self._build_jobs[target]
1654
1655       seconds = time.time() - job.start_timestamp
1656       details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60)
1657       previously_failed = target in self._failed
1658
1659       # Complain if necessary.
1660       if job.retcode != 0:
1661         # Handle job failure.
1662         if previously_failed:
1663           # If this job has failed previously, give up.
1664           self._Print("Failed %s. Your build has failed." % details)
1665         else:
1666           # Queue up this build to try again after a long while.
1667           retried.add(target)
1668           self._retry_queue.append(self._state_map[target])
1669           self._failed.add(target)
1670           self._Print("Failed %s, retrying later." % details)
1671       else:
1672         if previously_failed:
1673           # Remove target from list of failed packages.
1674           self._failed.remove(target)
1675
1676         self._Print("Completed %s" % details)
1677
1678         # Mark as completed and unblock waiting ebuilds.
1679         self._Finish(target)
1680
1681         if previously_failed and self._retry_queue:
1682           # If we have successfully retried a failed package, and there
1683           # are more failed packages, try the next one. We will only have
1684           # one retrying package actively running at a time.
1685           self._Retry()
1686
1687
1688       # Schedule pending jobs and print an update.
1689       self._ScheduleLoop()
1690       self._Status()
1691
1692     # If packages were retried, output a warning.
1693     if retried:
1694       self._Print("")
1695       self._Print("WARNING: The following packages failed the first time,")
1696       self._Print("but succeeded upon retry. This might indicate incorrect")
1697       self._Print("dependencies.")
1698       for pkg in retried:
1699         self._Print("  %s" % pkg)
1700       self._Print("@@@STEP_WARNINGS@@@")
1701       self._Print("")
1702
1703     # Tell child threads to exit.
1704     self._Print("Merge complete")
1705
1706
1707 def main(argv):
1708   try:
1709     return real_main(argv)
1710   finally:
1711     # Work around multiprocessing sucking and not cleaning up after itself.
1712     # http://bugs.python.org/issue4106;
1713     # Step one; ensure GC is ran *prior* to the VM starting shutdown.
1714     gc.collect()
1715     # Step two; go looking for those threads and try to manually reap
1716     # them if we can.
1717     for x in threading.enumerate():
1718       # Filter on the name, and ident; if ident is None, the thread
1719       # wasn't started.
1720       if x.name == 'QueueFeederThread' and x.ident is not None:
1721         x.join(1)
1722
1723
1724 def real_main(argv):
1725   parallel_emerge_args = argv[:]
1726   deps = DepGraphGenerator()
1727   deps.Initialize(parallel_emerge_args)
1728   emerge = deps.emerge
1729
1730   if emerge.action is not None:
1731     argv = deps.ParseParallelEmergeArgs(argv)
1732     return emerge_main(argv)
1733   elif not emerge.cmdline_packages:
1734     Usage()
1735     return 1
1736
1737   # Unless we're in pretend mode, there's not much point running without
1738   # root access. We need to be able to install packages.
1739   #
1740   # NOTE: Even if you're running --pretend, it's a good idea to run
1741   #       parallel_emerge with root access so that portage can write to the
1742   #       dependency cache. This is important for performance.
1743   if "--pretend" not in emerge.opts and portage.data.secpass < 2:
1744     print "parallel_emerge: superuser access is required."
1745     return 1
1746
1747   if "--quiet" not in emerge.opts:
1748     cmdline_packages = " ".join(emerge.cmdline_packages)
1749     print "Starting fast-emerge."
1750     print " Building package %s on %s" % (cmdline_packages,
1751                                           deps.board or "root")
1752
1753   deps_tree, deps_info = deps.GenDependencyTree()
1754
1755   # You want me to be verbose? I'll give you two trees! Twice as much value.
1756   if "--tree" in emerge.opts and "--verbose" in emerge.opts:
1757     deps.PrintTree(deps_tree)
1758
1759   deps_graph = deps.GenDependencyGraph(deps_tree, deps_info)
1760
1761   # OK, time to print out our progress so far.
1762   deps.PrintInstallPlan(deps_graph)
1763   if "--tree" in emerge.opts:
1764     PrintDepsMap(deps_graph)
1765
1766   # Are we upgrading portage? If so, and there are more packages to merge,
1767   # schedule a restart of parallel_emerge to merge the rest. This ensures that
1768   # we pick up all updates to portage settings before merging any more
1769   # packages.
1770   portage_upgrade = False
1771   root = emerge.settings["ROOT"]
1772   # pylint: disable=W0212
1773   final_db = emerge.depgraph._dynamic_config.mydbapi[root]
1774   if root == "/":
1775     for db_pkg in final_db.match_pkgs("sys-apps/portage"):
1776       portage_pkg = deps_graph.get(db_pkg.cpv)
1777       if portage_pkg:
1778         portage_upgrade = True
1779         if "--quiet" not in emerge.opts:
1780           print "Upgrading portage first, then restarting..."
1781
1782   # Upgrade Portage first, then the rest of the packages.
1783   #
1784   # In order to grant the child permission to run setsid, we need to run sudo
1785   # again. We preserve SUDO_USER here in case an ebuild depends on it.
1786   if portage_upgrade:
1787     # Calculate what arguments to use when re-invoking.
1788     args = ["sudo", "-E", "SUDO_USER=%s" % os.environ.get("SUDO_USER", "")]
1789     args += [os.path.abspath(sys.argv[0])] + parallel_emerge_args
1790     args += ["--exclude=sys-apps/portage"]
1791
1792     # First upgrade Portage.
1793     passthrough_args = ("--quiet", "--pretend", "--verbose")
1794     emerge_args = [k for k in emerge.opts if k in passthrough_args]
1795     ret = emerge_main(emerge_args + ["portage"])
1796     if ret != 0:
1797       return ret
1798
1799     # Now upgrade the rest.
1800     os.execvp(args[0], args)
1801
1802   # Run the queued emerges.
1803   scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output,
1804                           deps.unpack_only)
1805   try:
1806     scheduler.Run()
1807   finally:
1808     # pylint: disable=W0212
1809     scheduler._Shutdown()
1810   scheduler = None
1811
1812   clean_logs(emerge.settings)
1813
1814   print "Done"
1815   return 0