from BuildManager import * from BuildManager.rpmver import rpmVersionCompare from collections import UserList import subprocess import os import re import codecs class BuildManagerFileError(Error): pass class BuildManagerPackageError(Error): pass try: import rpm except ImportError: rpm = None __all__ = ["Package", "PackageList"] def subst(s, vars): def _subst(match, vars=vars): name = match.group(1) return str(vars[name]) try: return re.sub(r'\$([a-zA-Z]+)', _subst, s) except KeyError as var: raise Error("variable $%s not declared" % var) class Package: def __init__(self, file, log=None): self._package = None ext = self._filename_extension(file) if not ext: raise Error("unknown file extension of "+file) if "_package_"+ext not in globals(): raise Error("unknown package extension of "+file) self._package = globals()["_package_"+ext](file, log) def __getattr__(self, name): return getattr(self._package, name) def _filename_extension(self, filename): try: dotpos = filename.rindex(".") except ValueError: pass else: return filename[dotpos+1:] class _package: def __init__(self, file, log): self.file = file self.absfile = os.path.abspath(file) self.type = None self.name = None self.version = None self.release = None self.epoch = None self.spec = None self.builddir = None self.log = log or "$builddir/SPECS/log.$name" self._init() def __cmp__(self, pkg): rc = cmp(self.name, pkg.name) if rc: return rc return rpmVersionCompare(self.epoch, self.version, self.release, pkg.epoch, pkg.version, pkg.release) def _expand_log(self): substdict = {"builddir":self.builddir, "name":self.name, "version":self.version, "release":self.release} self.log = subst(self.log, substdict) class _package_spec(_package): def _rpm_vars(self, s, vars): end = -1 ret = [] while 1: start = s.find("%{", end+1) if start == -1: ret.append(s[end+1:]) break ret.append(s[end+1:start]) end = s.find("}", start) if end == -1: ret.append(s[start:]) break varname = s[start+2:end] if varname in vars: ret.append(vars[varname]) else: ret.append(s[start:end+1]) return "".join(ret) def _init(self): self.spec = self.absfile self.builddir = os.path.dirname(os.path.dirname(self.absfile)) ret = os.system("mkdir -p %s/{SOURCES,SPECS,BUILD,SRPMS,RPMS,BUILDROOT}" % self.builddir) try: f = codecs.open(self.spec, "r", 'utf-8', errors = "replace") except IOError as e: raise BuildManagerFileError("couldn't open spec file %s" % self.absfile) defines = {} for line in f.readlines(): lowerline = line.lower() if not self.name and lowerline[:5] == "name:": self.name = self._rpm_vars((line[5:]).strip(), defines) elif not self.version and lowerline[:8] == "version:": self.version = self._rpm_vars((line[8:]).strip(), defines) elif not self.release and lowerline[:8] == "release:": self.release = self._rpm_vars((line[8:]).strip(), defines) elif lowerline[:7] == "%define": token = (line[7:]).split() if len(token) == 2: defines[token[0]] = self._rpm_vars(token[1], defines) if self.name and self.version and self.release: break else: raise Error("spec file %s doesn't define name, " \ "version or release" % self.file) self.type = "spec" self._expand_log() class _package_rpm(_package): def _init(self): if not rpm: cmd = "rpm -qp --queryformat='%%{NAME} %%{EPOCH} %%{VERSION} %%{RELEASE} %%{SOURCERPM}' %s"%self.file status, output = subprocess.getstatusoutput(cmd) if status != 0: raise BuildManagerPackageError("error querying rpm file %s" % self.file) else: tokens = output.split(" ") if len(tokens) != 5: raise Error("unexpected output querying rpm file %s: %s" % \ (self.file, output)) else: self.name, self.epoch, self.version, self.release, srpm = tokens if self.epoch == "(none)": self.epoch = None if srpm != "(none)": self.type = "rpm" else: self.type = "srpm" else: # Boost up query if rpm module is available file = open(self.file) if hasattr(rpm, "headerFromPackage"): h = rpm.headerFromPackage(file.fileno())[0] else: ts = rpm.TransactionSet() h = ts.hdrFromFdno(file.fileno()) file.close() self.name = h[rpm.RPMTAG_NAME] self.epoch = h[rpm.RPMTAG_EPOCH] self.version = h[rpm.RPMTAG_VERSION] self.release = h[rpm.RPMTAG_RELEASE] if h[rpm.RPMTAG_SOURCERPM]: self.type = "rpm" else: self.type = "srpm" def unpack(self, unpackdir): if self.type == "srpm": self.builddir = self._builddir_create(unpackdir) if self.builddir: self._expand_log() return self._install_srpm() def _builddir_create(self, unpackdir): unpackdir = os.path.abspath(unpackdir) builddir = "%s/%s-%s-%s-topdir" % (unpackdir, self.name, self.version, self.release) ret = os.system("mkdir -p %s/{SOURCES,SPECS,BUILD,SRPMS,RPMS,BUILDROOT}" % builddir) if ret != 0: raise BuildManagerPackageError("error creating builddir at %s" % builddir) else: return builddir def _install_srpm(self): cmd = "rpm -i --define '_topdir %s' %s &> %s"%(self.builddir,self.file,self.log) status, output = subprocess.getstatusoutput(cmd) if status != 0: raise Error("error installing package "+self.file) else: spec = self.builddir+"/SPECS/"+self.name+".spec" if not os.path.isfile(spec): listdir = os.listdir(self.builddir+"/SPECS") for file in listdir[:]: if file[-5:] != ".spec": listdir.remove(file) if len(listdir) != 1: raise Error("can't guess spec file for "+self.file) else: self.spec = self.builddir+"/SPECS/"+listdir[0] return 1 else: self.spec = spec return 1 class PackageList(UserList): def has_lt(self, pkg): for mypkg in self.data: if mypkg.name == pkg.name \ and mypkg.type == pkg.type \ and mypkg < pkg: return 1 return 0 def has_le(self, pkg): for mypkg in self.data: if mypkg.name == pkg.name \ and mypkg.type == pkg.type \ and mypkg <= pkg: return 1 return 0 def has_eq(self, pkg): for mypkg in self.data: if mypkg.name == pkg.name \ and mypkg.type == pkg.type \ and mypkg == pkg: return 1 return 0 def has_ge(self, pkg): for mypkg in self.data: if mypkg.name == pkg.name \ and mypkg.type == pkg.type \ and mypkg >= pkg: return 1 return 0 def has_gt(self, pkg): for mypkg in self.data: if mypkg.name == pkg.name \ and mypkg.type == pkg.type \ and mypkg > pkg: return 1 return 0 # vim:ts=4:sw=4:et