diff options
Diffstat (limited to 'vcstool/clients/git.py')
-rw-r--r-- | vcstool/clients/git.py | 711 |
1 files changed, 711 insertions, 0 deletions
diff --git a/vcstool/clients/git.py b/vcstool/clients/git.py new file mode 100644 index 0000000..1be9afc --- /dev/null +++ b/vcstool/clients/git.py @@ -0,0 +1,711 @@ +import os + +from vcstool.executor import USE_COLOR + +from .vcs_base import VcsClientBase +from .vcs_base import which +from ..util import rmtree + + +class GitClient(VcsClientBase): + + type = 'git' + _executable = None + _config_color_is_auto = None + + @staticmethod + def is_repository(path): + return os.path.isdir(os.path.join(path, '.git')) + + def __init__(self, path): + super(GitClient, self).__init__(path) + + def branch(self, command): + self._check_executable() + cmd = [GitClient._executable, 'branch'] + result = self._run_command(cmd) + + if not command.all and not result['returncode']: + # only show current branch + lines = result['output'].splitlines() + lines = [line[2:] for line in lines if line.startswith('* ')] + result['output'] = '\n'.join(lines) + + return result + + def custom(self, command): + self._check_executable() + cmd = [GitClient._executable] + command.args + return self._run_command(cmd) + + def diff(self, command): + self._check_executable() + cmd = [GitClient._executable, 'diff'] + self._check_color(cmd) + if command.context: + cmd += ['--unified=%d' % command.context] + return self._run_command(cmd) + + def export(self, command): + self._check_executable() + exact = command.exact + if not exact: + # determine if a specific branch is checked out or ec is detached + cmd_branch = [ + GitClient._executable, 'rev-parse', '--abbrev-ref', 'HEAD'] + result_branch = self._run_command(cmd_branch) + if result_branch['returncode']: + result_branch['output'] = 'Could not determine ref: ' + \ + result_branch['output'] + return result_branch + branch_name = result_branch['output'] + exact = branch_name == 'HEAD' # is detached + + if not exact: + # determine the remote of the current branch + cmd_remote = [ + GitClient._executable, 'rev-parse', '--abbrev-ref', + '@{upstream}'] + result_remote = self._run_command(cmd_remote) + if result_remote['returncode']: + result_remote['output'] = 'Could not determine ref: ' + \ + result_remote['output'] + return result_remote + branch_with_remote = result_remote['output'] + + # determine remote + suffix = '/' + branch_name + assert branch_with_remote.endswith(branch_name), \ + "'%s' does not end with '%s'" % \ + (branch_with_remote, branch_name) + remote = branch_with_remote[:-len(suffix)] + + # if a local ref exists with the same name as the remote branch + # the result will be prefixed to make it unambiguous + prefix = 'remotes/' + if remote.startswith(prefix): + remote = remote[len(prefix):] + + # determine url of remote + result_url = self._get_remote_url(remote) + if result_url['returncode']: + return result_url + url = result_url['output'] + + # the result is the remote url and the branch name + return { + 'cmd': ' && '.join([ + result_branch['cmd'], result_remote['cmd'], + result_url['cmd']]), + 'cwd': self.path, + 'output': '\n'.join([url, branch_name]), + 'returncode': 0, + 'export_data': {'url': url, 'version': branch_name} + } + + else: + # determine the hash + cmd_ref = [GitClient._executable, 'rev-parse', 'HEAD'] + result_ref = self._run_command(cmd_ref) + if result_ref['returncode']: + result_ref['output'] = 'Could not determine ref: ' + \ + result_ref['output'] + return result_ref + ref = result_ref['output'] + + # get all remote names + cmd_remotes = [GitClient._executable, 'remote'] + result_remotes = self._run_command(cmd_remotes) + if result_remotes['returncode']: + result_remotes['output'] = 'Could not determine remotes: ' + \ + result_remotes['output'] + return result_remotes + remotes = result_remotes['output'].splitlines() + + # prefer origin and upstream remotes + if 'upstream' in remotes: + remotes.remove('upstream') + remotes.insert(0, 'upstream') + if 'origin' in remotes: + remotes.remove('origin') + remotes.insert(0, 'origin') + + # for each remote name check if the hash is part of the remote + for remote in remotes: + # get all remote names + cmd_refs = [ + GitClient._executable, 'rev-list', '--remotes=' + remote, + '--tags'] + result_refs = self._run_command(cmd_refs) + if result_refs['returncode']: + result_refs['output'] = \ + "Could not determine refs of remote '%s': " % \ + remote + result_refs['output'] + return result_refs + refs = result_refs['output'].splitlines() + if ref not in refs: + continue + + cmds = [result_ref['cmd']] + if command.with_tags: + # check if there is exactly one tag pointing to that ref + cmd_tags = [ + GitClient._executable, 'tag', '--points-at', ref] + result_tags = self._run_command(cmd_tags) + if result_tags['returncode']: + result_tags['output'] = \ + "Could not determine tags for ref '%s': " % \ + ref + result_tags['output'] + return result_tags + cmds.append(result_tags['cmd']) + tags = result_tags['output'].splitlines() + if len(tags) == 1: + tag = tags[0] + # double check that the tag is part of the remote + # and references the same hash + cmd_ls_remote = [ + GitClient._executable, 'ls-remote', remote, + 'refs/tags/' + tag] + result_ls_remote = self._run_command(cmd_ls_remote) + if result_ls_remote['returncode']: + result_ls_remote['output'] = \ + "Could not check remote tags for '%s': " % \ + remote + result_ls_remote['output'] + return result_ls_remote + matches = self._get_hash_ref_tuples( + result_ls_remote['output']) + if len(matches) == 1 and matches[0][0] == ref: + ref = tag + + # determine url of remote + result_url = self._get_remote_url(remote) + if result_url['returncode']: + return result_url + url = result_url['output'] + cmds.append(result_url['cmd']) + + # the result is the remote url and the hash/tag + return { + 'cmd': ' && '.join(cmds), + 'cwd': self.path, + 'output': '\n'.join([url, ref]), + 'returncode': 0, + 'export_data': {'url': url, 'version': ref} + } + + return { + 'cmd': ' && '.join([result_ref['cmd'], result_remotes['cmd']]), + 'cwd': self.path, + 'output': "Could not determine remote containing '%s'" % ref, + 'returncode': 1, + } + + def _get_remote_url(self, remote): + cmd_url = [ + GitClient._executable, 'config', '--get', 'remote.%s.url' % remote] + result_url = self._run_command(cmd_url) + if result_url['returncode']: + result_url['output'] = 'Could not determine remote url: ' + \ + result_url['output'] + return result_url + + def import_(self, command): + if not command.url: + return { + 'cmd': '', + 'cwd': self.path, + 'output': "Repository data lacks the 'url' value", + 'returncode': 1 + } + + self._check_executable() + if GitClient.is_repository(self.path): + # verify that existing repository is the same + result_urls = self._get_remote_urls() + if result_urls['returncode']: + return result_urls + for url, remote in result_urls['output']: + if url == command.url: + break + else: + if command.skip_existing: + return { + 'cmd': '', + 'cwd': self.path, + 'output': + 'Skipped existing repository with different URL', + 'returncode': 0 + } + if not command.force: + return { + 'cmd': '', + 'cwd': self.path, + 'output': + 'Path already exists and contains a different ' + 'repository', + 'returncode': 1 + } + try: + rmtree(self.path) + except OSError: + os.remove(self.path) + + elif command.skip_existing and os.path.exists(self.path): + return { + 'cmd': '', + 'cwd': self.path, + 'output': 'Skipped existing directory', + 'returncode': 0 + } + + elif command.force and os.path.exists(self.path): + # Not empty, not a git repository + try: + rmtree(self.path) + except OSError: + os.remove(self.path) + + not_exist = self._create_path() + if not_exist: + return not_exist + + if GitClient.is_repository(self.path): + if command.skip_existing: + checkout_version = None + elif command.version: + checkout_version = command.version + else: + # determine remote HEAD branch + cmd_remote = [GitClient._executable, 'remote', 'show', remote] + # override locale in order to parse output + env = os.environ.copy() + env['LC_ALL'] = 'C' + result_remote = self._run_command(cmd_remote, env=env) + if result_remote['returncode']: + result_remote['output'] = \ + 'Could not get remote information of repository ' \ + "'%s': %s" % (url, result_remote['output']) + return result_remote + prefix = ' HEAD branch: ' + for line in result_remote['output'].splitlines(): + if line.startswith(prefix): + checkout_version = line[len(prefix):] + break + else: + result_remote['returncode'] = 1 + result_remote['output'] = \ + 'Could not determine remote HEAD branch of ' \ + "repository '%s': %s" % (url, result_remote['output']) + return result_remote + + # fetch updates for existing repo + cmd_fetch = [GitClient._executable, 'fetch', remote] + if command.shallow: + result_version_type = self._check_version_type( + command.url, checkout_version) + if result_version_type['returncode']: + return result_version_type + version_type = result_version_type['version_type'] + if version_type == 'branch': + cmd_fetch.append( + 'refs/heads/%s:refs/remotes/%s/%s' % + (checkout_version, remote, checkout_version)) + elif version_type == 'hash': + cmd_fetch.append(checkout_version) + elif version_type == 'tag': + cmd_fetch.append( + '+refs/tags/%s:refs/tags/%s' % + (checkout_version, checkout_version)) + else: + assert False + cmd_fetch += ['--depth', '1'] + result_fetch = self._run_command(cmd_fetch, retry=command.retry) + if result_fetch['returncode']: + return result_fetch + cmd = result_fetch['cmd'] + output = result_fetch['output'] + + # ensure that a tracking branch exists which can be checked out + if command.shallow and version_type == 'branch': + cmd_show_ref = [ + GitClient._executable, 'show-ref', + 'refs/heads/%s' % checkout_version] + result_show_ref = self._run_command(cmd_show_ref) + if result_show_ref['returncode']: + # creating tracking branch + cmd_branch = [ + GitClient._executable, 'branch', checkout_version, + '%s/%s' % (remote, checkout_version)] + result_branch = self._run_command(cmd_branch) + if result_branch['returncode']: + result_branch['output'] = \ + "Could not create branch '%s': %s" % \ + (checkout_version, result_branch['output']) + return result_branch + cmd += ' && ' + ' '.join(cmd_branch) + output = '\n'.join([output, result_branch['output']]) + + else: + version_type = None + if command.version: + result_version_type = self._check_version_type( + command.url, command.version) + if result_version_type['returncode']: + return result_version_type + version_type = result_version_type['version_type'] + + if not command.shallow or version_type in (None, 'branch'): + cmd_clone = [GitClient._executable, 'clone', command.url, '.'] + if version_type == 'branch': + cmd_clone += ['-b', command.version] + checkout_version = None + else: + checkout_version = command.version + if command.shallow: + cmd_clone += ['--depth', '1'] + result_clone = self._run_command( + cmd_clone, retry=command.retry) + if result_clone['returncode']: + result_clone['output'] = \ + "Could not clone repository '%s': %s" % \ + (command.url, result_clone['output']) + return result_clone + cmd = result_clone['cmd'] + output = result_clone['output'] + else: + # getting a hash or tag with a depth of 1 can't use 'clone' + cmd_init = [GitClient._executable, 'init'] + result_init = self._run_command(cmd_init) + if result_init['returncode']: + return result_init + cmd = result_init['cmd'] + output = result_init['output'] + + cmd_remote_add = [ + GitClient._executable, 'remote', 'add', 'origin', + command.url] + result_remote_add = self._run_command(cmd_remote_add) + if result_remote_add['returncode']: + return result_remote_add + cmd += ' && ' + ' '.join(cmd_remote_add) + output = '\n'.join([output, result_remote_add['output']]) + + cmd_fetch = [GitClient._executable, 'fetch', 'origin'] + if version_type == 'hash': + cmd_fetch.append(command.version) + elif version_type == 'tag': + cmd_fetch.append( + 'refs/tags/%s:refs/tags/%s' % + (command.version, command.version)) + else: + assert False + cmd_fetch += ['--depth', '1'] + result_fetch = self._run_command( + cmd_fetch, retry=command.retry) + if result_fetch['returncode']: + return result_fetch + cmd += ' && ' + ' '.join(cmd_fetch) + output = '\n'.join([output, result_fetch['output']]) + + checkout_version = command.version + + if checkout_version: + cmd_checkout = [ + GitClient._executable, 'checkout', checkout_version, '--'] + result_checkout = self._run_command(cmd_checkout) + if result_checkout['returncode']: + result_checkout['output'] = \ + "Could not checkout ref '%s': %s" % \ + (checkout_version, result_checkout['output']) + return result_checkout + cmd += ' && ' + ' '.join(cmd_checkout) + output = '\n'.join([output, result_checkout['output']]) + + if command.recursive: + cmd_submodule = [ + GitClient._executable, 'submodule', 'update', '--init'] + result_submodule = self._run_command(cmd_submodule) + if result_submodule['returncode']: + result_submodule['output'] = \ + 'Could not init/update submodules: %s' % \ + result_submodule['output'] + return result_submodule + cmd += ' && ' + ' '.join(cmd_submodule) + output = '\n'.join([output, result_submodule['output']]) + + return { + 'cmd': cmd, + 'cwd': self.path, + 'output': output, + 'returncode': 0 + } + + def _get_remote_urls(self): + cmd_remote = [GitClient._executable, 'remote', 'show'] + result_remote = self._run_command(cmd_remote) + if result_remote['returncode']: + result_remote['output'] = 'Could not determine remotes: ' + \ + result_remote['output'] + return result_remote + remote_urls = [] + cmd = result_remote['cmd'] + for remote in result_remote['output'].splitlines(): + result_url = self._get_remote_url(remote) + cmd += ' && ' + result_url['cmd'] + if not result_url['returncode']: + remote_urls.append((result_url['output'], remote)) + return { + 'cmd': cmd, + 'cwd': self.path, + 'output': (remote_urls if remote_urls else + 'Could not determine any of the remote urls'), + 'returncode': 0 if remote_urls else 1 + } + + def _check_version_type(self, url, version): + cmd = [GitClient._executable, 'ls-remote', url, version] + result = self._run_command(cmd) + if result['returncode']: + result['output'] = 'Could not determine ref type of version: ' + \ + result['output'] + return result + if not result['output']: + result['version_type'] = 'hash' + return result + + refs = {} + for hash_, ref in self._get_hash_ref_tuples(result['output']): + refs[ref] = hash_ + + tag_ref = 'refs/tags/' + version + branch_ref = 'refs/heads/' + version + if tag_ref in refs and branch_ref in refs: + if refs[tag_ref] != refs[branch_ref]: + result['returncode'] = 1 + result['output'] = 'The version ref is a branch as well as ' \ + 'tag but with different hashes' + return result + if tag_ref in refs: + result['version_type'] = 'tag' + elif branch_ref in refs: + result['version_type'] = 'branch' + else: + result['version_type'] = 'hash' + return result + + def log(self, command): + self._check_executable() + if command.limit_tag: + # check if specific tag exists + cmd_tag = [GitClient._executable, 'tag', '-l', command.limit_tag] + result_tag = self._run_command(cmd_tag) + if result_tag['returncode']: + return result_tag + if not result_tag['output']: + return { + 'cmd': '', + 'cwd': self.path, + 'output': + "Repository lacks the tag '%s'" % command.limit_tag, + 'returncode': 1 + } + # output log since specific tag + cmd = [GitClient._executable, 'log', '%s..' % command.limit_tag] + elif command.limit_untagged: + # determine nearest tag + cmd_tag = [ + GitClient._executable, 'describe', '--abbrev=0', '--tags'] + result_tag = self._run_command(cmd_tag) + if result_tag['returncode']: + return result_tag + # output log since nearest tag + cmd = [GitClient._executable, 'log', '%s..' % result_tag['output']] + else: + cmd = [GitClient._executable, 'log'] + cmd += ['--decorate'] + if command.limit != 0: + cmd += ['-%d' % command.limit] + if not command.verbose: + cmd += ['--pretty=short'] + self._check_color(cmd) + return self._run_command(cmd) + + def pull(self, _command): + self._check_executable() + cmd = [GitClient._executable, 'pull'] + self._check_color(cmd) + result = self._run_command(cmd) + + if result['returncode']: + # check for detached HEAD + cmd_rev_parse = [ + GitClient._executable, 'rev-parse', '--abbrev-ref', 'HEAD'] + result_rev_parse = self._run_command(cmd_rev_parse) + if result_rev_parse['returncode']: + result_rev_parse['output'] = 'Could not determine ref: ' + \ + result_rev_parse['output'] + return result_rev_parse + detached = result_rev_parse['output'] == 'HEAD' + + if detached: + # warn but not fail about the inability to pull a detached head + return { + 'cmd': '', + 'cwd': self.path, + 'output': result['output'], + 'returncode': 0, + } + + return result + + def push(self, _command): + self._check_executable() + cmd = [GitClient._executable, 'push'] + return self._run_command(cmd) + + def remotes(self, _command): + self._check_executable() + cmd = [GitClient._executable, 'remote', '-v'] + return self._run_command(cmd) + + def status(self, command): + self._check_executable() + while command.hide_empty: + # check if ahead + cmd = [GitClient._executable, 'log', '@{push}..'] + result = self._run_command(cmd) + if not result['returncode'] and result['output']: + # ahead, do not hide + break + # check if behind + cmd = [GitClient._executable, 'log', '..@{upstream}'] + result = self._run_command(cmd) + if not result['returncode'] and result['output']: + # behind, do not hide + break + cmd = [GitClient._executable, 'status', '-s'] + if command.quiet: + cmd += ['--untracked-files=no'] + result = self._run_command(cmd) + if result['returncode'] or not result['output']: + return result + break + cmd = [GitClient._executable, 'status'] + self._check_color(cmd) + if command.quiet: + cmd += ['--untracked-files=no'] + return self._run_command(cmd) + + def validate(self, command): + if not command.url: + return { + 'cmd': '', + 'cwd': self.path, + 'output': "Repository data lacks the 'url' value", + 'returncode': 1 + } + + self._check_executable() + + cmd_ls_remote = [GitClient._executable, 'ls-remote'] + cmd_ls_remote += ['-q', '--exit-code'] + cmd_ls_remote += [command.url] + env = os.environ.copy() + env['GIT_TERMINAL_PROMPT'] = '0' + result_ls_remote = self._run_command( + cmd_ls_remote, + retry=command.retry, + env=env) + if result_ls_remote['returncode']: + result_ls_remote['output'] = \ + "Failed to contact remote repository '%s': %s" % \ + (command.url, result_ls_remote['output']) + return result_ls_remote + + if command.version: + hashes = [] + refs = [] + + for hash_and_ref in self._get_hash_ref_tuples( + result_ls_remote['output'] + ): + hashes.append(hash_and_ref[0]) + + # ignore pull request refs + if not hash_and_ref[1].startswith('refs/pull/'): + if hash_and_ref[1].startswith('refs/tags/'): + refs.append(hash_and_ref[1][10:]) + elif hash_and_ref[1].startswith('refs/heads/'): + refs.append(hash_and_ref[1][11:]) + else: + refs.append(hash_and_ref[1]) + + # test for refs first + ref_found = command.version in refs + + if not ref_found: + for _hash in hashes: + if _hash.startswith(command.version): + ref_found = True + break + + if not ref_found: + cmd = result_ls_remote['cmd'] + output = "Found git repository '%s' but " % command.url + \ + 'unable to verify non-branch / non-tag ref ' + \ + "'%s' without cloning the repo" % command.version + + return { + 'cmd': cmd, + 'cwd': self.path, + 'output': output, + 'returncode': 0 + } + else: + cmd = result_ls_remote['cmd'] + output = "Found git repository '%s' with ref '%s'" % \ + (command.url, command.version) + else: + cmd = result_ls_remote['cmd'] + output = "Found git repository '%s' with default branch" % \ + command.url + + return { + 'cmd': cmd, + 'cwd': self.path, + 'output': output, + 'returncode': None + } + + def _check_color(self, cmd): + if not USE_COLOR: + return + # check if user uses colorization + if GitClient._config_color_is_auto is None: + _cmd = [GitClient._executable, 'config', '--get', 'color.ui'] + result = self._run_command(_cmd) + GitClient._config_color_is_auto = result['output'] in ['', 'auto'] + + # inject arguments to force colorization + if GitClient._config_color_is_auto: + cmd[1:1] = '-c', 'color.ui=always' + + def _check_executable(self): + assert GitClient._executable is not None, \ + "Could not find 'git' executable" + + def _get_hash_ref_tuples(self, ls_remote_output): + tuples = [] + for line in ls_remote_output.splitlines(): + if line.startswith('#'): + continue + try: + hash_, ref = line.split(None, 1) + except ValueError: + continue + tuples.append((hash_, ref)) + return tuples + + +if not GitClient._executable: + GitClient._executable = which('git') |