diff options
author | Markus Lehtonen <markus.lehtonen@linux.intel.com> | 2012-01-12 15:29:03 +0200 |
---|---|---|
committer | Guido Günther <agx@sigxcpu.org> | 2014-07-24 23:33:36 +0200 |
commit | e7251f84bc17d585e260091b9efe2dade073982c (patch) | |
tree | c83b7b8787549df8a0b4ac5ff783b8cf15331687 /gbp/rpm | |
parent | 1a8e6d12cc94d2b40324fe8f99de012cf2cf5913 (diff) | |
download | git-buildpackage-e7251f84bc17d585e260091b9efe2dade073982c.tar.gz git-buildpackage-e7251f84bc17d585e260091b9efe2dade073982c.tar.bz2 git-buildpackage-e7251f84bc17d585e260091b9efe2dade073982c.zip |
Introduce rpm helpers
Implements a new gbp.rpm module that contains functionality for e.g.
parsing and editing spec files, reading src.rpm files rpm-specific
packaging policy etc.
Signed-off-by: Markus Lehtonen <markus.lehtonen@linux.intel.com>
Signed-off-by: Ed Bartosh <eduard.bartosh@intel.com>
Signed-off-by: Zhang Qiang <qiang.z.zhang@intel.com>
Signed-off-by: Huang Hao <hao.h.huang@intel.com>
Diffstat (limited to 'gbp/rpm')
-rw-r--r-- | gbp/rpm/__init__.py | 962 | ||||
-rw-r--r-- | gbp/rpm/git.py | 105 | ||||
-rw-r--r-- | gbp/rpm/lib_rpm.py | 47 | ||||
-rw-r--r-- | gbp/rpm/linkedlist.py | 214 | ||||
-rw-r--r-- | gbp/rpm/policy.py | 72 |
5 files changed, 1400 insertions, 0 deletions
diff --git a/gbp/rpm/__init__.py b/gbp/rpm/__init__.py new file mode 100644 index 00000000..87f82ffa --- /dev/null +++ b/gbp/rpm/__init__.py @@ -0,0 +1,962 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2006,2007 Guido Guenther <agx@sigxcpu.org> +# (C) 2012 Intel Corporation <markus.lehtonen@linux.intel.com> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""provides some rpm source package related helpers""" + +import commands +import sys +import os +import re +import tempfile +import glob +import shutil as shutil +from optparse import OptionParser +from collections import defaultdict + +import gbp.command_wrappers as gbpc +from gbp.errors import GbpError +from gbp.git import GitRepositoryError +from gbp.patch_series import (PatchSeries, Patch) +import gbp.log +from gbp.pkg import (UpstreamSource, compressor_opts, parse_archive_filename) +from gbp.rpm.policy import RpmPkgPolicy +from gbp.rpm.linkedlist import LinkedList +from gbp.rpm.lib_rpm import librpm, get_librpm_log + + +class NoSpecError(Exception): + """Spec file parsing error""" + pass + +class MacroExpandError(Exception): + """Macro expansion in spec file failed""" + pass + + +class RpmUpstreamSource(UpstreamSource): + """Upstream source class for RPM packages""" + def __init__(self, name, unpacked=None, **kwargs): + super(RpmUpstreamSource, self).__init__(name, + unpacked, + RpmPkgPolicy, + **kwargs) + + +class SrcRpmFile(object): + """Keeps all needed data read from a source rpm""" + def __init__(self, srpmfile): + # Do not required signed packages to be able to import + ts_vsflags = (librpm.RPMVSF_NOMD5HEADER | librpm.RPMVSF_NORSAHEADER | + librpm.RPMVSF_NOSHA1HEADER | librpm.RPMVSF_NODSAHEADER | + librpm.RPMVSF_NOMD5 | librpm.RPMVSF_NORSA | + librpm.RPMVSF_NOSHA1 | librpm.RPMVSF_NODSA) + srpmfp = open(srpmfile) + self.rpmhdr = librpm.ts(vsflags=ts_vsflags).hdrFromFdno(srpmfp.fileno()) + srpmfp.close() + self.srpmfile = os.path.abspath(srpmfile) + + @property + def version(self): + """Get the (downstream) version of the RPM package""" + version = dict(upstreamversion = self.rpmhdr[librpm.RPMTAG_VERSION], + release = self.rpmhdr[librpm.RPMTAG_RELEASE]) + if self.rpmhdr[librpm.RPMTAG_EPOCH] is not None: + version['epoch'] = str(self.rpmhdr[librpm.RPMTAG_EPOCH]) + return version + + @property + def name(self): + """Get the name of the RPM package""" + return self.rpmhdr[librpm.RPMTAG_NAME] + + @property + def upstreamversion(self): + """Get the upstream version of the RPM package""" + return self.rpmhdr[librpm.RPMTAG_VERSION] + + @property + def packager(self): + """Get the packager of the RPM package""" + return self.rpmhdr[librpm.RPMTAG_PACKAGER] + + def unpack(self, dest_dir): + """ + Unpack the source rpm to tmpdir. + Leave the cleanup to the caller in case of an error. + """ + gbpc.RunAtCommand('rpm2cpio', + [self.srpmfile, '|', 'cpio', '-id'], + shell=True)(dir=dest_dir) + + +class SpecFile(object): + """Class for parsing/modifying spec files""" + tag_re = re.compile(r'^(?P<name>[a-z]+)(?P<num>[0-9]+)?\s*:\s*' + '(?P<value>\S(.*\S)?)\s*$', flags=re.I) + directive_re = re.compile(r'^%(?P<name>[a-z]+)(?P<num>[0-9]+)?' + '(\s+(?P<args>.*))?$', flags=re.I) + gbptag_re = re.compile(r'^\s*#\s*gbp-(?P<name>[a-z-]+)' + '(\s*:\s*(?P<args>\S.*))?$', flags=re.I) + # Here "sections" stand for all scripts, scriptlets and other directives, + # but not macros + section_identifiers = ('package', 'description', 'prep', 'build', 'install', + 'clean', 'check', 'pre', 'preun', 'post', 'postun', 'verifyscript', + 'files', 'changelog', 'triggerin', 'triggerpostin', 'triggerun', + 'triggerpostun') + + def __init__(self, filename=None, filedata=None): + + self._content = LinkedList() + + # Check args: only filename or filedata can be given, not both + if filename is None and filedata is None: + raise NoSpecError("No filename or raw data given for parsing!") + elif filename and filedata: + raise NoSpecError("Both filename and raw data given, don't know " + "which one to parse!") + elif filename: + # Load spec file into our special data structure + self.specfile = os.path.basename(filename) + self.specdir = os.path.dirname(os.path.abspath(filename)) + try: + with open(filename) as spec_file: + for line in spec_file.readlines(): + self._content.append(line) + except IOError as err: + raise NoSpecError("Unable to read spec file: %s" % err) + else: + self.specfile = None + self.specdir = None + for line in filedata.splitlines(): + self._content.append(line + '\n') + + # Use rpm-python to parse the spec file content + self._filtertags = ("excludearch", "excludeos", "exclusivearch", + "exclusiveos","buildarch") + self._listtags = self._filtertags + ('source', 'patch', + 'requires', 'conflicts', 'recommends', + 'suggests', 'supplements', 'enhances', + 'provides', 'obsoletes', 'buildrequires', + 'buildconflicts', 'buildrecommends', + 'buildsuggests', 'buildsupplements', + 'buildenhances', 'collections', + 'nosource', 'nopatch') + self._specinfo = self._parse_filtered_spec(self._filtertags) + + # Other initializations + source_header = self._specinfo.packages[0].header + self.name = source_header[librpm.RPMTAG_NAME] + self.upstreamversion = source_header[librpm.RPMTAG_VERSION] + self.release = source_header[librpm.RPMTAG_RELEASE] + # rpm-python returns epoch as 'long', convert that to string + self.epoch = str(source_header[librpm.RPMTAG_EPOCH]) \ + if source_header[librpm.RPMTAG_EPOCH] != None else None + self.packager = source_header[librpm.RPMTAG_PACKAGER] + self._tags = {} + self._special_directives = defaultdict(list) + self._gbp_tags = defaultdict(list) + + # Parse extra info from spec file + self._parse_content() + + # Find 'Packager' tag. Needed to circumvent a bug in python-rpm where + # spec.sourceHeader[librpm.RPMTAG_PACKAGER] is not reset when a new spec + # file is parsed + if 'packager' not in self._tags: + self.packager = None + + self.orig_src = self._guess_orig_file() + + def _parse_filtered_spec(self, skip_tags): + """Parse a filtered spec file in rpm-python""" + skip_tags = [tag.lower() for tag in skip_tags] + with tempfile.NamedTemporaryFile(prefix='gbp') as filtered: + filtered.writelines(str(line) for line in self._content + if str(line).split(":")[0].strip().lower() not in skip_tags) + filtered.flush() + try: + # Parse two times to circumvent a rpm-python problem where + # macros are not expanded if used before their definition + librpm.spec(filtered.name) + return librpm.spec(filtered.name) + except ValueError as err: + rpmlog = get_librpm_log() + gbp.log.debug("librpm log:\n %s" % + "\n ".join(rpmlog)) + raise GbpError("RPM error while parsing %s: %s (%s)" % + (self.specfile, err, rpmlog[-1])) + + @property + def version(self): + """Get the (downstream) version""" + version = dict(upstreamversion = self.upstreamversion, + release = self.release) + if self.epoch != None: + version['epoch'] = self.epoch + return version + + @property + def specpath(self): + """Get the dir/filename""" + return os.path.join(self.specdir, self.specfile) + + @property + def ignorepatches(self): + """Get numbers of ignored patches as a sorted list""" + if 'ignore-patches' in self._gbp_tags: + data = self._gbp_tags['ignore-patches'][-1]['args'].split() + return sorted([int(num) for num in data]) + return [] + + def _patches(self): + """Get all patch tags as a dict""" + if 'patch' not in self._tags: + return {} + return {patch['num']: patch for patch in self._tags['patch']['lines']} + + def _sources(self): + """Get all source tags as a dict""" + if 'source' not in self._tags: + return {} + return {src['num']: src for src in self._tags['source']['lines']} + + def sources(self): + """Get all source tags as a dict""" + return {src['num']: src['linevalue'] + for src in self._sources().values()} + + def _macro_replace(self, matchobj): + macro_dict = {'name': self.name, + 'version': self.upstreamversion, + 'release': self.release} + + if matchobj.group(2) in macro_dict: + return macro_dict[matchobj.group(2)] + raise MacroExpandError("Unknown macro '%s'" % matchobj.group(0)) + + def macro_expand(self, text): + """ + Expand the rpm macros (that gbp knows of) in the given text. + + @param text: text to check for macros + @type text: C{str} + @return: text with macros expanded + @rtype: C{str} + """ + # regexp to match '%{macro}' and '%macro' + macro_re = re.compile(r'%({)?(?P<macro_name>[a-z_][a-z0-9_]*)(?(1)})', flags=re.I) + return macro_re.sub(self._macro_replace, text) + + def write_spec_file(self): + """ + Write, possibly updated, spec to disk + """ + with open(os.path.join(self.specdir, self.specfile), 'w') as spec_file: + for line in self._content: + spec_file.write(str(line)) + + def _parse_tag(self, lineobj): + """Parse tag line""" + + line = str(lineobj) + + matchobj = self.tag_re.match(line) + if not matchobj: + return False + + tagname = matchobj.group('name').lower() + tagnum = int(matchobj.group('num')) if matchobj.group('num') else None + # 'Source:' tags + if tagname == 'source': + tagnum = 0 if tagnum is None else tagnum + # 'Patch:' tags + elif tagname == 'patch': + tagnum = -1 if tagnum is None else tagnum + + # Record all tag locations + try: + header = self._specinfo.packages[0].header + tagvalue = header[getattr(librpm, 'RPMTAG_%s' % tagname.upper())] + except AttributeError: + tagvalue = None + # We don't support "multivalue" tags like "Provides:" or "SourceX:" + # Rpm python doesn't support many of these, thus the explicit list + if type(tagvalue) is int or type(tagvalue) is long: + tagvalue = str(tagvalue) + elif type(tagvalue) is list or tagname in self._listtags: + tagvalue = None + elif not tagvalue: + # Rpm python doesn't give the following, for reason or another + if tagname not in ('buildroot', 'autoprov', 'autoreq', + 'autoreqprov') + self._filtertags: + gbp.log.warn("BUG: '%s:' tag not found by rpm" % tagname) + tagvalue = matchobj.group('value') + linerecord = {'line': lineobj, + 'num': tagnum, + 'linevalue': matchobj.group('value')} + if tagname in self._tags: + self._tags[tagname]['value'] = tagvalue + self._tags[tagname]['lines'].append(linerecord) + else: + self._tags[tagname] = {'value': tagvalue, 'lines': [linerecord]} + + return tagname + + @staticmethod + def _patch_macro_opts(args): + """Parse arguments of the '%patch' macro""" + + patchparser = OptionParser() + patchparser.add_option("-p", dest="strip") + patchparser.add_option("-s", dest="silence") + patchparser.add_option("-P", dest="patchnum") + patchparser.add_option("-b", dest="backup") + patchparser.add_option("-E", dest="removeempty") + arglist = args.split() + return patchparser.parse_args(arglist)[0] + + @staticmethod + def _setup_macro_opts(args): + """Parse arguments of the '%setup' macro""" + + setupparser = OptionParser() + setupparser.add_option("-n", dest="name") + setupparser.add_option("-c", dest="create_dir", action="store_true") + setupparser.add_option("-D", dest="no_delete_dir", action="store_true") + setupparser.add_option("-T", dest="no_unpack_default", + action="store_true") + setupparser.add_option("-b", dest="unpack_before") + setupparser.add_option("-a", dest="unpack_after") + setupparser.add_option("-q", dest="quiet", action="store_true") + arglist = args.split() + return setupparser.parse_args(arglist)[0] + + def _parse_directive(self, lineobj): + """Parse special directive/scriptlet/macro lines""" + + line = str(lineobj) + matchobj = self.directive_re.match(line) + if not matchobj: + return None + + directivename = matchobj.group('name') + # '%patch' macros + directiveid = None + if directivename == 'patch': + opts = self._patch_macro_opts(matchobj.group('args')) + if matchobj.group('num'): + directiveid = int(matchobj.group('num')) + elif opts.patchnum: + directiveid = int(opts.patchnum) + else: + directiveid = -1 + + # Record special directive/scriptlet/macro locations + if directivename in self.section_identifiers + ('setup', 'patch'): + linerecord = {'line': lineobj, + 'id': directiveid, + 'args': matchobj.group('args')} + self._special_directives[directivename].append(linerecord) + return directivename + + def _parse_gbp_tag(self, linenum, lineobj): + """Parse special git-buildpackage tags""" + + line = str(lineobj) + matchobj = self.gbptag_re.match(line) + if matchobj: + gbptagname = matchobj.group('name').lower() + if gbptagname not in ('ignore-patches', 'patch-macros'): + gbp.log.info("Found unrecognized Gbp tag on line %s: '%s'" % + (linenum, line)) + if matchobj.group('args'): + args = matchobj.group('args').strip() + else: + args = None + record = {'line': lineobj, 'args': args} + self._gbp_tags[gbptagname].append(record) + return gbptagname + + return None + + def _parse_content(self): + """ + Go through spec file content line-by-line and (re-)parse info from it + """ + in_preamble = True + for linenum, lineobj in enumerate(self._content): + matched = False + if in_preamble: + if self._parse_tag(lineobj): + continue + matched = self._parse_directive(lineobj) + if matched: + if matched in self.section_identifiers: + in_preamble = False + continue + self._parse_gbp_tag(linenum, lineobj) + + # Update sources info (basically possible macros expanded by rpm) + # And, double-check that we parsed spec content correctly + patches = self._patches() + sources = self._sources() + for name, num, typ in self._specinfo.sources: + # workaround rpm parsing bug + if typ == 1 or typ == 9: + if num in sources: + sources[num]['linevalue'] = name + else: + gbp.log.err("BUG: failed to parse all 'Source' tags!") + elif typ == 2 or typ == 10: + # Patch tag without any number defined is treated by RPM as + # having number (2^31-1), we use number -1 + if num >= pow(2,30): + num = -1 + if num in patches: + patches[num]['linevalue'] = name + else: + gbp.log.err("BUG: failed to parse all 'Patch' tags!") + + def _delete_tag(self, tag, num): + """Delete a tag""" + key = tag.lower() + tagname = '%s%s' % (tag, num) if num is not None else tag + if key not in self._tags: + gbp.log.warn("Trying to delete non-existent tag '%s:'" % tag) + return None + + sparedlines = [] + prev = None + for line in self._tags[key]['lines']: + if line['num'] == num: + gbp.log.debug("Removing '%s:' tag from spec" % tagname) + prev = self._content.delete(line['line']) + else: + sparedlines.append(line) + self._tags[key]['lines'] = sparedlines + if not self._tags[key]['lines']: + self._tags.pop(key) + return prev + + def _set_tag(self, tag, num, value, insertafter): + """Set a tag value""" + key = tag.lower() + tagname = '%s%s' % (tag, num) if num is not None else tag + value = value.strip() + if not value: + raise GbpError("Cannot set empty value to '%s:' tag" % tag) + + # Check type of tag, we don't support values for 'multivalue' tags + try: + header = self._specinfo.packages[0].header + tagvalue = header[getattr(librpm, 'RPMTAG_%s' % tagname.upper())] + except AttributeError: + tagvalue = None + tagvalue = None if type(tagvalue) is list else value + + # Try to guess the correct indentation from the previous or next tag + indent_re = re.compile(r'^([a-z]+([0-9]+)?\s*:\s*)', flags=re.I) + match = indent_re.match(str(insertafter)) + if not match: + match = indent_re.match(str(insertafter.next)) + indent = 12 if not match else len(match.group(1)) + text = '%-*s%s\n' % (indent, '%s:' % tagname, value) + if key in self._tags: + self._tags[key]['value'] = tagvalue + for line in reversed(self._tags[key]['lines']): + if line['num'] == num: + gbp.log.debug("Updating '%s:' tag in spec" % tagname) + line['line'].set_data(text) + line['linevalue'] = value + return line['line'] + + gbp.log.debug("Adding '%s:' tag after '%s...' line in spec" % + (tagname, str(insertafter)[0:20])) + line = self._content.insert_after(insertafter, text) + linerec = {'line': line, 'num': num, 'linevalue': value} + if key in self._tags: + self._tags[key]['lines'].append(linerec) + else: + self._tags[key] = {'value': tagvalue, 'lines': [linerec]} + return line + + def set_tag(self, tag, num, value, insertafter=None): + """Update a tag in spec file content""" + key = tag.lower() + tagname = '%s%s' % (tag, num) if num is not None else tag + if key in ('patch', 'vcs'): + if key in self._tags: + insertafter = key + elif not insertafter in self._tags: + insertafter = 'name' + after_line = self._tags[insertafter]['lines'][-1]['line'] + if value: + self._set_tag(tag, num, value, after_line) + elif key in self._tags: + self._delete_tag(tag, num) + else: + raise GbpError("Setting '%s:' tag not supported" % tagname) + + def _delete_special_macro(self, name, identifier): + """Delete a special macro line in spec file content""" + if name != 'patch': + raise GbpError("Deleting '%s:' macro not supported" % name) + + key = name.lower() + fullname = '%%%s%s' % (name, identifier) + sparedlines = [] + prev = None + for line in self._special_directives[key]: + if line['id'] == identifier: + gbp.log.debug("Removing '%s' macro from spec" % fullname) + prev = self._content.delete(line['line']) + else: + sparedlines.append(line) + self._special_directives[key] = sparedlines + if not prev: + gbp.log.warn("Tried to delete non-existent macro '%s'" % fullname) + return prev + + def _set_special_macro(self, name, identifier, args, insertafter): + """Update a special macro line in spec file content""" + key = name.lower() + fullname = '%%%s%s' % (name, identifier) + if key != 'patch': + raise GbpError("Setting '%s' macro not supported" % name) + + updated = 0 + text = "%%%s%d %s\n" % (name, identifier, args) + for line in self._special_directives[key]: + if line['id'] == identifier: + gbp.log.debug("Updating '%s' macro in spec" % fullname) + line['args'] = args + line['line'].set_data(text) + ret = line['line'] + updated += 1 + if not updated: + gbp.log.debug("Adding '%s' macro after '%s...' line in spec" % + (fullname, str(insertafter)[0:20])) + ret = self._content.insert_after(insertafter, text) + linerec = {'line': ret, 'id': identifier, 'args': args} + self._special_directives[key].append(linerec) + return ret + + def _set_section(self, name, text): + """Update/create a complete section in spec file.""" + if name not in self.section_identifiers: + raise GbpError("Not a valid section directive: '%s'" % name) + # Delete section, if it exists + if name in self._special_directives: + if len(self._special_directives[name]) > 1: + raise GbpError("Multiple %%%s sections found, don't know " + "which to update" % name) + line = self._special_directives[name][0]['line'] + gbp.log.debug("Removing content of %s section" % name) + while line.next: + match = self.directive_re.match(str(line.next)) + if match and match.group('name') in self.section_identifiers: + break + self._content.delete(line.next) + else: + gbp.log.debug("Adding %s section to the end of spec file" % name) + line = self._content.append('%%%s\n' % name) + linerec = {'line': line, 'id': None, 'args': None} + self._special_directives[name] = [linerec] + # Add new lines + gbp.log.debug("Updating content of %s section" % name) + for linetext in text.splitlines(): + line = self._content.insert_after(line, linetext + '\n') + + def set_changelog(self, text): + """Update or create the %changelog section""" + self._set_section('changelog', text) + + def get_changelog(self): + """Get the %changelog section""" + text = '' + if 'changelog' in self._special_directives: + line = self._special_directives['changelog'][0]['line'] + while line.next: + line = line.next + match = self.directive_re.match(str(line)) + if match and match.group('name') in self.section_identifiers: + break + text += str(line) + return text + + def update_patches(self, patches, commands): + """Update spec with new patch tags and patch macros""" + # Remove non-ignored patches + tag_prev = None + macro_prev = None + ignored = self.ignorepatches + # Remove 'Patch:̈́' tags + for tag in self._patches().values(): + if not tag['num'] in ignored: + tag_prev = self._delete_tag('patch', tag['num']) + # Remove a preceding comment if it seems to originate from GBP + if re.match("^\s*#.*patch.*auto-generated", + str(tag_prev), flags=re.I): + tag_prev = self._content.delete(tag_prev) + + # Remove '%patch:' macros + for macro in self._special_directives['patch']: + if not macro['id'] in ignored: + macro_prev = self._delete_special_macro('patch', macro['id']) + # Remove surrounding if-else + macro_next = macro_prev.next + if (str(macro_prev).startswith('%if') and + str(macro_next).startswith('%endif')): + self._content.delete(macro_next) + macro_prev = self._content.delete(macro_prev) + + # Remove a preceding comment line if it ends with '.patch' or + # '.diff' plus an optional compression suffix + if re.match("^\s*#.+(patch|diff)(\.(gz|bz2|xz|lzma))?\s*$", + str(macro_prev), flags=re.I): + macro_prev = self._content.delete(macro_prev) + + if len(patches) == 0: + return + + # Determine where to add Patch tag lines + if tag_prev: + gbp.log.debug("Adding 'Patch' tags in place of the removed tags") + tag_line = tag_prev + elif 'patch' in self._tags: + gbp.log.debug("Adding new 'Patch' tags after the last 'Patch' tag") + tag_line = self._tags['patch']['lines'][-1]['line'] + elif 'source' in self._tags: + gbp.log.debug("Didn't find any old 'Patch' tags, adding new " + "patches after the last 'Source' tag.") + tag_line = self._tags['source']['lines'][-1]['line'] + else: + gbp.log.debug("Didn't find any old 'Patch' or 'Source' tags, " + "adding new patches after the last 'Name' tag.") + tag_line = self._tags['name']['lines'][-1]['line'] + + # Determine where to add %patch macro lines + if 'patch-macros' in self._gbp_tags: + gbp.log.debug("Adding '%patch' macros after the start marker") + macro_line = self._gbp_tags['patch-macros'][-1]['line'] + elif macro_prev: + gbp.log.debug("Adding '%patch' macros in place of the removed " + "macros") + macro_line = macro_prev + elif self._special_directives['patch']: + gbp.log.debug("Adding new '%patch' macros after the last existing" + "'%patch' macro") + macro_line = self._special_directives['patch'][-1]['line'] + elif self._special_directives['setup']: + gbp.log.debug("Didn't find any old '%patch' macros, adding new " + "patches after the last '%setup' macro") + macro_line = self._special_directives['setup'][-1]['line'] + elif self._special_directives['prep']: + gbp.log.warn("Didn't find any old '%patch' or '%setup' macros, " + "adding new patches directly after '%prep' directive") + macro_line = self._special_directives['prep'][-1]['line'] + else: + raise GbpError("Couldn't determine where to add '%patch' macros") + + startnum = sorted(ignored)[-1] + 1 if ignored else 0 + gbp.log.debug("Starting autoupdate patch numbering from %s" % startnum) + # Add a comment indicating gbp generated patch tags + comment_text = "# Patches auto-generated by git-buildpackage:\n" + tag_line = self._content.insert_after(tag_line, comment_text) + for ind, patch in enumerate(patches): + cmds = commands[patch] if patch in commands else {} + patchnum = startnum + ind + tag_line = self._set_tag("Patch", patchnum, patch, tag_line) + # Add '%patch' macro and a preceding comment line + comment_text = "# %s\n" % patch + macro_line = self._content.insert_after(macro_line, comment_text) + macro_line = self._set_special_macro('patch', patchnum, '-p1', + macro_line) + for cmd, args in cmds.iteritems(): + if cmd in ('if', 'ifarch'): + self._content.insert_before(macro_line, '%%%s %s\n' % + (cmd, args)) + macro_line = self._content.insert_after(macro_line, + '%endif\n') + # We only support one command per patch, for now + break + + def patchseries(self, unapplied=False, ignored=False): + """Return non-ignored patches of the RPM as a gbp patchseries""" + series = PatchSeries() + if 'patch' in self._tags: + tags = self._patches() + applied = [] + for macro in self._special_directives['patch']: + if macro['id'] in tags: + applied.append((macro['id'], macro['args'])) + ignored = set() if ignored else set(self.ignorepatches) + + # Put all patches that are applied first in the series + for num, args in applied: + if num not in ignored: + opts = self._patch_macro_opts(args) + strip = int(opts.strip) if opts.strip else 0 + filename = os.path.basename(tags[num]['linevalue']) + series.append(Patch(os.path.join(self.specdir, filename), + strip=strip)) + # Finally, append all unapplied patches to the series, if requested + if unapplied: + applied_nums = set([num for num, _args in applied]) + unapplied = set(tags.keys()).difference(applied_nums) + for num in sorted(unapplied): + if num not in ignored: + filename = os.path.basename(tags[num]['linevalue']) + series.append(Patch(os.path.join(self.specdir, + filename), strip=0)) + return series + + def _guess_orig_prefix(self, orig): + """Guess prefix for the orig file""" + # Make initial guess about the prefix in the archive + filename = orig['filename'] + name, version = RpmPkgPolicy.guess_upstream_src_version(filename) + if name and version: + prefix = "%s-%s/" % (name, version) + else: + prefix = orig['filename_base'] + "/" + + # Refine our guess about the prefix + for macro in self._special_directives['setup']: + args = macro['args'] + opts = self._setup_macro_opts(args) + srcnum = None + if opts.no_unpack_default: + if opts.unpack_before: + srcnum = int(opts.unpack_before) + elif opts.unpack_after: + srcnum = int(opts.unpack_after) + else: + srcnum = 0 + if srcnum == orig['num']: + if opts.create_dir: + prefix = '' + elif opts.name: + try: + prefix = self.macro_expand(opts.name) + '/' + except MacroExpandError as err: + gbp.log.warn("Couldn't determine prefix from %%setup "\ + "macro (%s). Using filename base as a " \ + "fallback" % err) + prefix = orig['filename_base'] + '/' + else: + # RPM default + prefix = "%s-%s/" % (self.name, self.upstreamversion) + break + return prefix + + def _guess_orig_file(self): + """ + Try to guess the name of the primary upstream/source archive. + Returns a dict with all the relevant information. + """ + orig = None + sources = self.sources() + for num, filename in sorted(sources.iteritems()): + src = {'num': num, 'filename': os.path.basename(filename), + 'uri': filename} + src['filename_base'], src['archive_fmt'], src['compression'] = \ + parse_archive_filename(os.path.basename(filename)) + if (src['filename_base'].startswith(self.name) and + src['archive_fmt']): + # Take the first archive that starts with pkg name + orig = src + break + # otherwise we take the first archive + elif not orig and src['archive_fmt']: + orig = src + # else don't accept + if orig: + orig['prefix'] = self._guess_orig_prefix(orig) + + return orig + + +def parse_srpm(srpmfile): + """parse srpm by creating a SrcRpmFile object""" + try: + srcrpm = SrcRpmFile(srpmfile) + except IOError, err: + raise GbpError, "Error reading src.rpm file: %s" % err + except librpm.error, err: + raise GbpError, "RPM error while reading src.rpm: %s" % err + + return srcrpm + + +def guess_spec_fn(file_list, preferred_name=None): + """Guess spec file from a list of filenames""" + specs = [] + for filepath in file_list: + filename = os.path.basename(filepath) + # Stop at the first file matching the preferred name + if filename == preferred_name: + gbp.log.debug("Found a preferred spec file %s" % filepath) + specs = [filepath] + break + if filename.endswith(".spec"): + gbp.log.debug("Found spec file %s" % filepath) + specs.append(filepath) + if len(specs) == 0: + raise NoSpecError("No spec file found.") + elif len(specs) > 1: + raise NoSpecError("Multiple spec files found (%s), don't know which " + "to use." % ', '.join(specs)) + return specs[0] + + +def guess_spec(topdir, recursive=True, preferred_name=None): + """Guess a spec file""" + file_list = [] + if not topdir: + topdir = '.' + for root, dirs, files in os.walk(topdir): + file_list.extend([os.path.join(root, fname) for fname in files]) + if not recursive: + del dirs[:] + # Skip .git dir in any case + if '.git' in dirs: + dirs.remove('.git') + return SpecFile(os.path.abspath(guess_spec_fn(file_list, preferred_name))) + + +def guess_spec_repo(repo, treeish, topdir='', recursive=True, preferred_name=None): + """ + Try to find/parse the spec file from a given git treeish. + """ + topdir = topdir.rstrip('/') + ('/') if topdir else '' + try: + file_list = [nam for (mod, typ, sha, nam) in + repo.list_tree(treeish, recursive, topdir) if typ == 'blob'] + except GitRepositoryError as err: + raise NoSpecError("Cannot find spec file from treeish %s, Git error: %s" + % (treeish, err)) + spec_path = guess_spec_fn(file_list, preferred_name) + return spec_from_repo(repo, treeish, spec_path) + + +def spec_from_repo(repo, treeish, spec_path): + """Get and parse a spec file from a give Git treeish""" + try: + spec = SpecFile(filedata=repo.show('%s:%s' % (treeish, spec_path))) + spec.specdir = os.path.dirname(spec_path) + spec.specfile = os.path.basename(spec_path) + return spec + except GitRepositoryError as err: + raise NoSpecError("Git error: %s" % err) + + +def string_to_int(val_str): + """ + Convert string of possible unit identifier to int. + + @param val_str: value to be converted + @type val_str: C{str} + @return: value as integer + @rtype: C{int} + + >>> string_to_int("1234") + 1234 + >>> string_to_int("123k") + 125952 + >>> string_to_int("1234K") + 1263616 + >>> string_to_int("1M") + 1048576 + """ + units = {'k': 1024, + 'm': 1024**2, + 'g': 1024**3, + 't': 1024**4} + + if val_str[-1].lower() in units: + return int(val_str[:-1]) * units[val_str[-1].lower()] + else: + return int(val_str) + + +def split_version_str(version): + """ + Parse full version string and split it into individual "version + components", i.e. upstreamversion, epoch and release + + @param version: full version of a package + @type version: C{str} + @return: individual version components + @rtype: C{dict} + + >>> split_version_str("1") + {'release': None, 'epoch': None, 'upstreamversion': '1'} + >>> split_version_str("1.2.3-5.3") + {'release': '5.3', 'epoch': None, 'upstreamversion': '1.2.3'} + >>> split_version_str("3:1.2.3") + {'release': None, 'epoch': '3', 'upstreamversion': '1.2.3'} + >>> split_version_str("3:1-0") + {'release': '0', 'epoch': '3', 'upstreamversion': '1'} + """ + ret = {'epoch': None, 'upstreamversion': None, 'release': None} + + e_vr = version.split(":", 1) + if len(e_vr) == 1: + v_r = e_vr[0].split("-", 1) + else: + ret['epoch'] = e_vr[0] + v_r = e_vr[1].split("-", 1) + ret['upstreamversion'] = v_r[0] + if len(v_r) > 1: + ret['release'] = v_r[1] + + return ret + +def compose_version_str(evr): + """ + Compose a full version string from individual "version components", + i.e. epoch, version and release + + @param evr: dict of version components + @type evr: C{dict} of C{str} + @return: full version + @rtype: C{str} + + >>> compose_version_str({'epoch': '', 'upstreamversion': '1.0'}) + '1.0' + >>> compose_version_str({'epoch': '2', 'upstreamversion': '1.0', 'release': None}) + '2:1.0' + >>> compose_version_str({'epoch': None, 'upstreamversion': '1', 'release': '0'}) + '1-0' + >>> compose_version_str({'epoch': '2', 'upstreamversion': '1.0', 'release': '2.3'}) + '2:1.0-2.3' + >>> compose_version_str({'epoch': '2', 'upstreamversion': '', 'release': '2.3'}) + """ + if 'upstreamversion' in evr and evr['upstreamversion']: + version = "" + if 'epoch' in evr and evr['epoch']: + version += "%s:" % evr['epoch'] + version += evr['upstreamversion'] + if 'release' in evr and evr['release']: + version += "-%s" % evr['release'] + if version: + return version + return None + + +# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: diff --git a/gbp/rpm/git.py b/gbp/rpm/git.py new file mode 100644 index 00000000..c7cc023b --- /dev/null +++ b/gbp/rpm/git.py @@ -0,0 +1,105 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2011 Guido Günther <agx@sigxcpu.org> +# (C) 2012 Intel Corporation <markus.lehtonen@linux.intel.com> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +import re + +from gbp.git import GitRepository, GitRepositoryError +from gbp.pkg.pristinetar import PristineTar +from gbp.rpm import compose_version_str + +class RpmGitRepository(GitRepository): + """A git repository that holds the source of an RPM package""" + + def __init__(self, path): + super(RpmGitRepository, self).__init__(path) + self.pristine_tar = PristineTar(self) + + def find_version(self, format, str_fields): + """ + Check if a certain version is stored in this repo and return the SHA1 + of the related commit. That is, an annotated tag is dereferenced to the + commit object it points to. + + @param format: tag pattern + @type format: C{str} + @param str_fields: arguments for format string ('upstreamversion', 'release', 'vendor'...) + @type str_fields: C{dict} of C{str} + @return: sha1 of the commit the tag references to + """ + try: + tag = self.version_to_tag(format, str_fields) + except KeyError: + return None + if self.has_tag(tag): # new tags are injective + # dereference to a commit object + return self.rev_parse("%s^0" % tag) + return None + + @staticmethod + def version_to_tag(format, str_fields): + """ + Generate a tag from a given format and a version + + @param format: tag pattern + @type format: C{str} + @param str_fields: arguments for format string ('upstreamversion', 'release', 'vendor'...) + @type str_fields: C{dict} of C{str} + @return: version tag + + >>> RpmGitRepository.version_to_tag("packaging/%(version)s", dict(epoch='0', upstreamversion='0~0')) + 'packaging/0%0_0' + >>> RpmGitRepository.version_to_tag("%(vendor)s/v%(version)s", dict(upstreamversion='1.0', release='2', vendor="myvendor")) + 'myvendor/v1.0-2' + """ + version_tag = format % dict(str_fields, + version=compose_version_str(str_fields)) + return RpmGitRepository._sanitize_tag(version_tag) + + @staticmethod + def _sanitize_tag(tag): + """sanitize a version so git accepts it as a tag + + >>> RpmGitRepository._sanitize_tag("0.0.0") + '0.0.0' + >>> RpmGitRepository._sanitize_tag("0.0~0") + '0.0_0' + >>> RpmGitRepository._sanitize_tag("0:0.0") + '0%0.0' + >>> RpmGitRepository._sanitize_tag("0%0~0") + '0%0_0' + """ + return tag.replace('~', '_').replace(':', '%') + + @property + def pristine_tar_branch(self): + """ + The name of the pristine-tar branch, whether it already exists or + not. + """ + return PristineTar.branch + + def has_pristine_tar_branch(self): + """ + Wheter the repo has a I{pristine-tar} branch. + + @return: C{True} if the repo has pristine-tar commits already, C{False} + otherwise + @rtype: C{Bool} + """ + return True if self.has_branch(self.pristine_tar_branch) else False + +# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: diff --git a/gbp/rpm/lib_rpm.py b/gbp/rpm/lib_rpm.py new file mode 100644 index 00000000..4bad44e7 --- /dev/null +++ b/gbp/rpm/lib_rpm.py @@ -0,0 +1,47 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2012 Intel Corporation <markus.lehtonen@linux.intel.com> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""Wrapper module for librpm""" + +import tempfile + +import gbp.log +from gbp.rpm.policy import RpmPkgPolicy + +try: + # Try to load special RPM lib to be used for GBP (only) + librpm = __import__(RpmPkgPolicy.python_rpmlib_module_name) +except ImportError: + gbp.log.warn("Failed to import '%s' as rpm python module, using host's " + "default rpm library instead" % + RpmPkgPolicy.python_rpmlib_module_name) + import rpm as librpm + +# Module initialization +_rpmlog = tempfile.NamedTemporaryFile(prefix='gbp_rpmlog') +_rpmlogfd = _rpmlog.file +librpm.setVerbosity(librpm.RPMLOG_INFO) +librpm.setLogFile(_rpmlogfd) + + +def get_librpm_log(truncate=True): + """Get rpmlib log output""" + _rpmlogfd.seek(0) + log = [line.strip() for line in _rpmlogfd.readlines()] + if truncate: + _rpmlogfd.truncate(0) + return log + diff --git a/gbp/rpm/linkedlist.py b/gbp/rpm/linkedlist.py new file mode 100644 index 00000000..ca000453 --- /dev/null +++ b/gbp/rpm/linkedlist.py @@ -0,0 +1,214 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2012 Intel Corporation <markus.lehtonen@linux.intel.com> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""Simple implementation of a doubly linked list""" + +import collections + +import gbp.log + + +class LinkedListNode(object): + """Node of the linked list""" + + def __init__(self, data="", prev_node=None, next_node=None): + self.prev = prev_node + self.next = next_node + self._data = data + + def __str__(self): + return str(self.data) + + @property + def data(self): + """Get data stored into node""" + if self._data is None: + gbp.log.err("BUG: referencing a deleted node!") + return("") + return self._data + + def set_data(self, data): + """ + Set data stored into node + + >>> node = LinkedListNode('foo') + >>> node.data + 'foo' + >>> node.set_data('bar') + >>> node.data + 'bar' + >>> node.set_data(None) + >>> node.data + '' + """ + if data is None: + gbp.log.err("BUG: trying to store 'None', not allowed") + data = "" + self._data = data + + + def delete(self): + """Delete node""" + if self.prev: + self.prev.next = self.next + if self.next: + self.next.prev = self.prev + self._data = None + + +class LinkedListIterator(collections.Iterator): + """Iterator for the linked list""" + + def __init__(self, obj): + self._next = obj.first + + def next(self): + ret = self._next + if ret: + self._next = ret.next + else: + raise StopIteration + return ret + + +class LinkedList(collections.Iterable): + """Doubly linked list""" + + def __init__(self): + self._first = None + self._last = None + + def __iter__(self): + return LinkedListIterator(self) + + def __len__(self): + for num, data in enumerate(self): + pass + return num + 1 + + @property + def first(self): + """Get the first node of the list""" + return self._first + + def prepend(self, data): + """ + Insert to the beginning of list + + >>> list = LinkedList() + >>> [str(data) for data in list] + [] + >>> node = list.prepend("foo") + >>> len(list) + 1 + >>> node = list.prepend("bar") + >>> [str(data) for data in list] + ['bar', 'foo'] + """ + if self._first is None: + new = self._first = self._last = LinkedListNode(data) + else: + new = self.insert_before(self._first, data) + return new + + def append(self, data): + """ + Insert to the end of list + + >>> list = LinkedList() + >>> node = list.append('foo') + >>> len(list) + 1 + >>> node = list.append('bar') + >>> [str(data) for data in list] + ['foo', 'bar'] + """ + if self._last is None: + return self.prepend(data) + else: + return self.insert_after(self._last, data) + + def insert_before(self, node, data=""): + """ + Insert before a node + + >>> list = LinkedList() + >>> node1 = list.append('foo') + >>> node2 = list.insert_before(node1, 'bar') + >>> node3 = list.insert_before(node1, 'baz') + >>> [str(data) for data in list] + ['bar', 'baz', 'foo'] + """ + new = LinkedListNode(data, prev_node=node.prev, next_node=node) + if node.prev: + node.prev.next = new + else: + self._first = new + node.prev = new + return new + + def insert_after(self, node, data=""): + """ + Insert after a node + + >>> list = LinkedList() + >>> node1 = list.prepend('foo') + >>> node2 = list.insert_after(node1, 'bar') + >>> node3 = list.insert_after(node1, 'baz') + >>> [str(data) for data in list] + ['foo', 'baz', 'bar'] + """ + new = LinkedListNode(data, prev_node=node, next_node=node.next) + if node.next: + node.next.prev = new + else: + self._last = new + node.next = new + return new + + def delete(self, node): + """ + Delete node + + >>> list = LinkedList() + >>> node1 = list.prepend('foo') + >>> node2 = list.insert_after(node1, 'bar') + >>> node3 = list.insert_before(node2, 'baz') + >>> [str(data) for data in list] + ['foo', 'baz', 'bar'] + >>> str(list.delete(node3)) + 'foo' + >>> [str(data) for data in list] + ['foo', 'bar'] + >>> print "%s" % node3 + <BLANKLINE> + >>> str(list.delete(node1)) + 'bar' + >>> [str(data) for data in list] + ['bar'] + >>> list.delete(node2) + >>> [str(data) for data in list] + [] + """ + ret = node.prev + if node is self._first: + ret = self._first = self._first.next + if node is self._last: + self._last = self._last.prev + node.delete() + return ret + +# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: diff --git a/gbp/rpm/policy.py b/gbp/rpm/policy.py new file mode 100644 index 00000000..f8cb8630 --- /dev/null +++ b/gbp/rpm/policy.py @@ -0,0 +1,72 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2012 Intel Corporation <markus.lehtonen@linux.intel.com> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""Default packaging policy for RPM""" + +import re +from gbp.pkg import PkgPolicy, parse_archive_filename + +class RpmPkgPolicy(PkgPolicy): + """Packaging policy for RPM""" + + # Special rpmlib python module for GBP (only) + python_rpmlib_module_name = "rpm" + + alnum = 'a-zA-Z0-9' + # Valid characters for RPM pkg name + name_whitelist_chars = '._+%{}\-' + # Valid characters for RPM pkg version + version_whitelist_chars = '._+%{}~' + + # Regexp for checking the validity of package name + packagename_re = re.compile("^[%s][%s%s]+$" % + (alnum, alnum, name_whitelist_chars)) + packagename_msg = ("Package names must be at least two characters long, " + "start with an alphanumeric and can only contain " + "alphanumerics or characters in %s" % + list(name_whitelist_chars)) + + # Regexp for checking the validity of package (upstream) version + upstreamversion_re = re.compile("^[0-9][%s%s]*$" % + (alnum, version_whitelist_chars)) + upstreamversion_msg = ("Upstream version numbers must start with a digit " + "and can only containg alphanumerics or characters " + "in %s" % list(version_whitelist_chars)) + + @classmethod + def is_valid_orig_archive(cls, filename): + """ + Is this a valid orig source archive + + @param filename: upstream source archive filename + @type filename: C{str} + @return: true if valid upstream source archive filename + @rtype: C{bool} + + >>> RpmPkgPolicy.is_valid_orig_archive("foo/bar_baz.tar.gz") + True + >>> RpmPkgPolicy.is_valid_orig_archive("foo.bar.tar") + True + >>> RpmPkgPolicy.is_valid_orig_archive("foo.bar") + False + >>> RpmPkgPolicy.is_valid_orig_archive("foo.gz") + False + """ + _base, arch_fmt, _compression = parse_archive_filename(filename) + if arch_fmt: + return True + return False + |