#!/usr/bin/python -tt
# vim: ai ts=4 sts=4 et sw=4
#
-# Copyright (c) 2011 Intel, Inc.
+# Copyright (c) 2011, 2012, 2013 Intel, Inc.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# internal modules
import runner
import errors
-from utils import Workdir, retry
+from utils import retry
-class Git:
- def __init__(self, path):
- if not os.path.isdir(os.path.join(path, '.git')):
- raise errors.GitError(path)
-
- self.path = os.path.abspath(path)
- self._git_dir = os.path.join(path, '.git')
-
- # as cache
- self.cur_branch = None
- self.branches = None
-
- def _is_sha1(self, val):
- sha1_re = re.compile(r'[0-9a-f]{40}$')
- return True if sha1_re.match(val) else False
-
- def _exec_git(self, command, args=[]):
- """Exec a git command and return the output
- """
-
- cmd = ['git', command] + args
-
- cmdln = ' '.join(cmd)
- print 'run command: %s' % cmdln
-
- with Workdir(self.path):
- ret, outs = runner.show(cmdln)
-
- #if ret:
- # raise errors.GitError("command error for: %s" % cmdln)
-
- return ret, outs
-
- def status(self, *args):
- ret, outs = self._exec_git('status', ['-s'] + list(args))
-
- sts = {}
- for line in outs.splitlines():
- st = line[:2]
- if st not in sts:
- sts[st] = [line[2:].strip()]
- else:
- sts[st].append(line[2:].strip())
-
- return sts
-
- def ls_files(self):
- """Return the files list
- """
- return filter(None, self._exec_git('ls-files')[1].splitlines())
-
- def rev_parse(self, name, *args):
- """ Find the SHA1 of a given name commit id"""
- options = ["--quiet", "--verify", "%s^{commit}" % name]
- options += list(args)
- with Workdir(self.path):
- ret, outs = self._exec_git('rev-parse', options)
- if not ret:
- return outs.strip()
- else:
- return None
-
- def create_branch(self, branch, rev=None):
- if rev and not self._is_sha1(rev):
- rev = self.rev_parse(rev)
- if not branch:
- raise errors.GitError('Branch name should not be None')
-
- options = [branch, rev, '-f']
- self._exec_git('branch', options)
-
- def _get_branches(self):
- """Return the branches list, current working branch is the first
- element.
- """
- branches = []
- for line in self._exec_git('branch', ['--no-color'])[1].splitlines():
- br = line.strip().split()[-1]
-
- if line.startswith('*'):
- current_branch = br
-
- branches.append(br)
-
- return (current_branch, branches)
-
- def get_branches(self):
- if not self.cur_branch or not self.branches:
- self.cur_branch, self.branches = \
- self._get_branches()
-
- return (self.cur_branch, self.branches)
-
- def is_clean(self):
- """does the repository contain any uncommitted modifications"""
-
- gitsts = self.status()
- if 'M ' in gitsts or ' M' in gitsts or \
- 'A ' in gitsts or ' A' in gitsts or \
- 'D ' in gitsts or ' D' in gitsts:
- return False
- else:
- return True
-
- def has_branch(self, br, remote=False):
- """Check if the repository has branch 'br'
- @param remote: only liste remote branches
- """
-
- if remote:
- options = [ '--no-color', '-r' ]
-
- for line in self._exec_git('branch', options)[1].splitlines():
- rbr = line.strip().split()[-1]
- if br == rbr:
- return True
-
- return False
-
- else:
- return (br in self.get_branches()[1])
-
- def checkout(self, *args):
- """checkout repository branch 'br'
- """
- with Workdir(self.path):
- ret, outs = self._exec_git('checkout', list(args))
-
- def clean_branch(self, br):
- """Clean up repository branch 'br'
- """
-
- options = ['-dfx']
- with Workdir(self.path):
- self.checkout(br)
- runner.quiet('rm .git/index')
- self._exec_git('clean', options)
-
- def commit_dir(self, unpack_dir, msg, branch = 'master', other_parents=None,
- author={}, committer={}, create_missing_branch=False):
-
- for key, val in author.items():
- if val:
- os.environ['GIT_AUTHOR_%s' % key.upper()] = val
- for key, val in committer.items():
- if val:
- os.environ['GIT_COMMITTER_%s' % key.upper()] = val
-
- os.environ['GIT_WORK_TREE'] = unpack_dir
- options = ['.', '-f']
- self._exec_git("add", options)
+# gbp import
+from gbp.git.repository import GitRepository
+from gbp.git.commit import GitCommit
+import gbp.log as log
- changed = not self.is_clean()
- if changed:
- options = ['--quiet','-a', '-m %s' % msg,]
- self._exec_git("commit", options)
-
- commit_id = self._exec_git('log', ['--oneline', '-1'])[1].split()[0]
-
- del os.environ['GIT_WORK_TREE']
- for key, val in author.items():
- if val:
- del os.environ['GIT_AUTHOR_%s' % key.upper()]
- for key, val in committer.items():
- if val:
- del os.environ['GIT_COMMITTER_%s' % key.upper()]
-
- self._exec_git('reset', ['--hard', commit_id])
-
- return commit_id if changed else None
-
- def find_tag(self, tag):
- """find the specify version from the repository"""
- args = ['-l', tag]
- ret = self._exec_git('tag', args)[1]
- if ret:
- return True
- return False
-
- def commits(self, since=None, until=None, paths=None, options=None,
- first_parent=False):
- """
- get commits from since to until touching paths
-
- @param options: list of options past to git log
- @type options: list of strings
- """
-
- args = ['--pretty=format:%H']
-
- if options:
- args += options
-
- if first_parent:
- args += [ "--first-parent" ]
-
- if since and until:
- assert(self.rev_parse(since))
- assert(self.rev_parse(until))
- args += ['%s..%s' % (since, until)]
-
- if paths:
- args += [ "--", paths ]
-
- commits = self._exec_git('log', args)[1]
-
- return [ commit.strip() for commit in commits.split('\n') ]
-
- def get_config(self, name):
- """Gets the config value associated with name"""
- return self._exec_git('config', [ name ] )[1]
-
- def get_commit_info(self, commit):
- """Given a commit name, return a dictionary of its components,
- including id, author, email, subject, and body."""
- ret, outs = self._exec_git('log',
- ['--pretty=format:%h%n%an%n%ae%n%s%n%b%n',
- '-n1', commit])
- if ret:
- return None
- else:
- out = outs.splitlines()
- return {'id' : out[0].strip(),
- 'author' : out[1].strip(),
- 'email' : out[2].strip(),
- 'subject' : out[3].rstrip(),
- 'body' : [line.rstrip() for line in out[4:]]}
+class Git(GitRepository):
+ def __init__(self, path):
+ GitRepository.__init__(self, path)
+ log.setup('auto', log.DEBUG)
def get_tag(self, tag_name):
- ret, outs = self._exec_git('show',
- ['-s --pretty=format:###', tag_name])
+ outs, ret = self._git_getoutput('show',
+ ['-s', '--pretty=format:###', tag_name])
tag = {}
msg = ''
msgstub = False
- for line in outs.splitlines():
+ for line in outs:
if not line:
continue
if msgstub:
return tag
- def create_tag(self, name, msg, commit):
- """Creat a tag with name at commit"""
- if self.rev_parse(commit) is None:
- raise errors.GitError('%s is invalid commit ID' % commit)
- options = [name, '-m "%s"' % msg.replace('"', '\''), commit]
- self._exec_git('tag', options)
-
- def merge(self, commit):
- """ merge the git tree specified by commit to current branch"""
- if self.rev_parse(commit) is None and not self.find_tag(commit):
- raise errors.GitError('%s is invalid commit ID or tag' % commit)
-
- options = [commit]
- self._exec_git('merge', options)
-
- @staticmethod
- def _formatlize(version):
- return version.replace('~', '_').replace(':', '%')
-
- @staticmethod
- def version_to_tag(format, version):
- return format % dict(version=Git._formatlize(version))
-
- @classmethod
- def create(klass, path, description=None, bare=False):
- """
- Create a repository at path
- @path: where to create the repository
- """
- abspath = os.path.abspath(path)
- options = []
- if bare:
- options = [ '--bare' ]
- git_dir = ''
- else:
- options = []
- git_dir = '.git'
-
- try:
- if not os.path.exists(abspath):
- os.makedirs(abspath)
-
- with Workdir(abspath):
- cmd = ['git', 'init'] + options;
- runner.quiet(' '.join(cmd))
- if description:
- with file(os.path.join(abspath, git_dir, "description"), 'w') as f:
- description += '\n' if description[-1] != '\n' else ''
- f.write(description)
- return klass(abspath)
- except OSError, err:
- raise errors.GitError("Cannot create Git repository at '%s': %s"
- % (abspath, err[1]))
- return None
-
- def show(self, *args):
- """show commit details
- """
- with Workdir(self.path):
- ret, outs = self._exec_git('show', list(args))
- if not ret:
- return outs.splitlines()
- else:
- return None
-
def push(self, *args):
-
- with Workdir(self.path):
- ret, outs = self._exec_git('push', list(args))
- if not ret:
- return outs.splitlines()
- else:
- return None
+ outs, ret = self._git_getoutput('push', list(args))
+ return outs if not ret else None
def fetch(self, *args):
"""Download objects and refs from another repository
"""
- with Workdir(self.path):
- ret, outs = self._exec_git('fetch', list(args))
- if ret:
- return False
- else:
- return True
+ outs, ret = self._git_getoutput('fetch', list(args))
+ return not ret
def describe(self, *args):
"""Show the most recent tag that is reachable from a commit
"""
- with Workdir(self.path):
- ret, outs = self._exec_git('describe', list(args))
- if not ret:
- return outs.strip()
- else:
- return None
+ outs, ret = self._git_getoutput('describe', list(args))
+ return ''.join(outs).strip() if not ret else None
- def delete_tags(self, *args):
+ def delete_tag(self, *args):
"""Delete a tag (or tags)"""
- with Workdir(self.path):
- self._exec_git('tag -d', list(args))
+ self._git_getoutput('tag', ['-d'] + list(args))
def pull(self, *args):
"""Fetch from and merge with another repository or a local branch
"""
- with Workdir(self.path):
- ret, outs = self._exec_git('pull', list(args))
- if ret:
- return False
- else:
- return True
+ outs, ret = self._git_getoutput('pull', list(args))
+ return not ret
def clean(self, *args):
- with Workdir(self.path):
- ret, outs = self._exec_git('clean', list(args))
+ self._git_getoutput('clean', list(args))
def get_base_commit(self, commit):
- out = self._exec_git('log',
+ out = self._git_getoutput('log',
['--pretty=format:%H',
- '-n2', commit])[1].split('\n')
+ '-n2', commit])[0]
return out[1].strip()
def format_patch(self, *args):
- with Workdir(self.path):
- ret, outs = self._exec_git('format-patch', list(args))
- if not ret:
- return outs.splitlines()
- else:
- return None
+ outs, ret = self._git_getoutput('format-patch', list(args))
+ return outs[0] if not ret else None
def branch_contains(self, cmitid):
- with Workdir(self.path):
- ret, outs = self._exec_git('branch -a --contains', [cmitid])
+ outs, ret = self._git_getoutput('branch', ['-a', '--contains', cmitid])
if not ret:
branches = []
- for br in outs.splitlines():
+ for br in outs:
if br.find('remotes/origin/HEAD ->') != -1:
continue
br = br.strip().lstrip('* ')
else:
return None
+ def rev_parse(self, name, *args):
+ """ Find the SHA1 of a given name commit id"""
+ options = ["--quiet", "--verify", "%s^{commit}" % name]
+ options += list(args)
+ outs, ret = self._git_getoutput('rev-parse', options)
+ if not ret:
+ return outs[0][:-1] # remove '\n'
+ else:
+ return None
+
def _update_gitproject(localdir):
"""Fetch latest code to local dir"""