from MgaRepo import Error, SilentError, config from MgaRepo.util import execcmd, get_auth import sys import os import re import time class VCSLogEntry(object): def __init__(self, revision, author, date): self.revision = revision self.author = author self.date = date self.changed = [] self.lines = [] def __lt__(self, other): return (self.date < other.date) def __eq__(self,other): return (self.date == other.date) class VCS(object): def __init__(self): self.vcs_name = None self.vcs_command = None self.vcs_wrapper = "mga-ssh" self.vcs_supports = {'clone' : False} self.env_defaults = None def _execVcs(self, *args, **kwargs): localcmds = ("add", "revert", "cleanup", "mv") kwargs["collecterr"] = False if kwargs.get("show"): if not kwargs.get("local"): kwargs["collecterr"] = True else: if args[0] not in localcmds: args = list(args) args.append("--non-interactive") else: if args[0] == "mv": kwargs["collecterr"] = False else: kwargs["collecterr"] = True kwargs["cleanerr"] = True if kwargs.get("xml"): args.append("--xml") cmdargs = [self.vcs_command] cmdargs.extend(args) try: if args[0] in ('info', 'checkout','log'): kwargs['info'] = True else: kwargs['info'] = False return execcmd(cmdargs, **kwargs) except Error as e: msg = None if e.args: if "Permission denied" in e.args: msg = ("It seems ssh-agent or ForwardAgent are not setup " "or your username is wrong. See " "https://wiki.mageia.org/en/Packagers_ssh" " for more information.") elif "authorization failed" in e.args: msg = ("Note that mgarepo does not support any HTTP " "authenticated access.") if kwargs.get("show") and \ not config.getbool("global", "verbose", 0): # svn has already dumped error messages, we don't need to # do it too if msg: sys.stderr.write("\n") sys.stderr.write(msg) sys.stderr.write("\n") raise SilentError elif msg: raise Error("%s\n%s" % (e, msg)) raise def _set_env(self): wrapper = "mgarepo-ssh" repsys = config.get("global", "mgarepo-cmd") if repsys: dir = os.path.dirname(repsys) path = os.path.join(dir, wrapper) if os.path.exists(path): wrapper = path defaults = {"SVN_SSH": wrapper} os.environ.update(defaults) raw = config.get("global", "svn-env") if raw: for line in raw.split("\n"): env = line.strip() if not env: continue try: name, value = env.split("=", 1) except ValueError: sys.stderr.write("invalid svn environment line: %r\n" % env) continue os.environ[name] = value def _execVcs_success(self, *args, **kwargs): status, output = self._execVcs(*args, **kwargs) return status == 0 def _add_log(self, cmd_args, received_kwargs, optional=0): if (not optional or "log" in received_kwargs or "logfile" in received_kwargs): ret = received_kwargs.get("log") if ret is not None: cmd_args.extend(("-m", ret)) ret = received_kwargs.get("logfile") if ret is not None: cmd_args.extend(("-F", ret)) def _add_revision(self, cmd_args, received_kwargs, optional=0): if not optional or "rev" in received_kwargs: ret = received_kwargs.get("rev") if isinstance(ret, str): if not ret.startswith("{"): # if not a datespec try: ret = int(ret) except ValueError: raise Error("invalid revision provided") if ret: cmd_args.extend(("-r", str(ret))) def add(self, path, **kwargs): cmd = ["add", path + '@' if '@' in path else path] return self._execVcs_success(noauth=1, *cmd, **kwargs) def copy(self, pathfrom, pathto, **kwargs): cmd = ["copy", pathfrom + '@' if '@' in pathfrom else pathfrom, pathto + '@' if '@' in pathto else pathto] self._add_revision(cmd, kwargs, optional=1) self._add_log(cmd, kwargs) return self._execVcs_success(*cmd, **kwargs) def remove(self, path, force=0, **kwargs): cmd = ["remove", path + '@' if '@' in path else path] self._add_log(cmd, kwargs) if force: cmd.append("--force") return self._execVcs_success(*cmd, **kwargs) def mkdir(self, path, **kwargs): cmd = ["mkdir", path + '@' if '@' in path else path] if kwargs.get("parents"): cmd.append("--parents") self._add_log(cmd, kwargs) return self._execVcs_success(*cmd, **kwargs) def _execVcs_commit(self, *cmd, **kwargs): status, output = self._execVcs(*cmd, **kwargs) match = re.search("Committed revision (?P\\d+)\\.$", output) if match: rawrev = match.group("rev") return int(rawrev) def commit(self, path, **kwargs): cmd = ["commit", path + '@' if '@' in path else path] if kwargs.get("nonrecursive"): cmd.append("-N") self._add_log(cmd, kwargs) return self._execVcs_commit(*cmd, **kwargs) def import_(self, path, url, **kwargs): cmd = ["import", path, url] self._add_log(cmd, kwargs) return self._execVcs_commit(*cmd, **kwargs) def export(self, url, targetpath, **kwargs): cmd = ["export", url, targetpath] self._add_revision(cmd, kwargs, optional=1) return self._execVcs_success(*cmd, **kwargs) def checkout(self, url, targetpath, **kwargs): cmd = ["checkout", url, targetpath] self._add_revision(cmd, kwargs, optional=1) return self._execVcs_success(*cmd, **kwargs) def clone(self, url, targetpath, **kwargs): if self.vcs_supports['clone']: cmd = ["clone", url, targetpath] return self._execVcs_success(*cmd, **kwargs) else: raise Error("%s doesn't support 'clone'" % self.vcs_name) def propget(self, propname, targets, **kwargs): cmd = ["propget", propname, targets] if kwargs.get("revprop"): cmd.append("--revprop") self._add_revision(cmd, kwargs) status, output = self._execVcs(local=True, *cmd, **kwargs) return output def propset(self, propname, value, targets, **kwargs): cmd = ["propset", propname, value, targets] return self._execVcs_success(*cmd, **kwargs) def propedit(self, propname, target, **kwargs): cmd = ["propedit", propname, target] if kwargs.get("rev"): cmd.append("--revprop") self._add_revision(cmd, kwargs) return self._execVcs_success(local=True, show=True, *cmd, **kwargs) def revision(self, path, **kwargs): cmd = ["info", path + '@' if '@' in path else path] status, output = self._execVcs(local=True, *cmd, **kwargs) if status == 0: for line in output.splitlines(): if line.startswith("Last Changed Rev: "): return int(line.split()[3]) return None def info(self, path, **kwargs): cmd = ["info", path + '@' if '@' in path else path] status, output = self._execVcs(local=True, noerror=True, *cmd, **kwargs) if (("Not a versioned resource" not in output) and ("svn: warning: W155010" not in output)): return output.splitlines() return None def info2(self, *args, **kwargs): lines = self.info(*args, **kwargs) if lines is None: return None pairs = [[w.strip() for w in line.split(":", 1)] for line in lines] info = {} for pair in pairs: if pair != ['']: info[pair[0]]=pair[1] return info def ls(self, path, **kwargs): cmd = ["ls", path + '@' if '@' in path else path] status, output = self._execVcs(*cmd, **kwargs) if status == 0: return output.split() return None def status(self, path, **kwargs): cmd = ["status", path + '@' if '@' in path else path] if kwargs.get("verbose"): cmd.append("-v") if kwargs.get("noignore"): cmd.append("--no-ignore") if kwargs.get("quiet"): cmd.append("--quiet") status, output = self._execVcs(*cmd, **kwargs) if status == 0: return [(x[0], x[8:]) for x in output.splitlines()] return None def cleanup(self, path, **kwargs): cmd = ["cleanup", path + '@' if '@' in path else path] return self._execVcs_success(*cmd, **kwargs) def revert(self, path, **kwargs): cmd = ["revert", path + '@' if '@' in path else path] status, output = self._execVcs(*cmd, **kwargs) if status == 0: return [x.split() for x in output.split()] return None def switch(self, url, oldurl=None, path=None, relocate=False, **kwargs): cmd = ["switch"] if relocate: if oldurl is None: raise Error("You must supply the old URL when "\ "relocating working copies") cmd.append("--relocate") cmd.append(oldurl) cmd.append(url) if path is not None: cmd.append(path) return self._execVcs_success(*cmd, **kwargs) def update(self, path, **kwargs): cmd = ["update", path + '@' if '@' in path else path] self._add_revision(cmd, kwargs, optional=1) status, output = self._execVcs(*cmd, **kwargs) if status == 0: return [x.split() for x in output.split()] return None def merge(self, url1, url2=None, rev1=None, rev2=None, path=None, **kwargs): cmd = ["merge"] if rev1 and rev2 and not url2: cmd.append("-r") cmd.append("%s:%s" % (rev1, rev2)) cmd.append(url1) else: if not url2: raise ValueError("url2 needed if two revisions are not provided") if rev1: cmd.append("%s@%s" % (url1, rev1)) else: cmd.append(url1) if rev2: cmd.append("%s@%s" % (url2, rev2)) else: cmd.append(url2) if path: cmd.append(path) status, output = self._execVcs(*cmd, **kwargs) if status == 0: return [x.split() for x in output.split()] return None def diff(self, pathurl1, pathurl2=None, **kwargs): cmd = ["diff", pathurl1] self._add_revision(cmd, kwargs, optional=1) if pathurl2: cmd.append(pathurl2) status, output = self._execVcs(*cmd, **kwargs) if status == 0: return output return None def cat(self, url, **kwargs): cmd = ["cat", url] self._add_revision(cmd, kwargs, optional=1) status, output = self._execVcs(*cmd, **kwargs) if status == 0: return output return None def log(self, url, start=None, end=0, limit=None, **kwargs): cmd = ["log", "-v", url] if start is not None or end != 0: if start is not None and type(start) is not type(0): try: start = int(start) except (ValueError, TypeError): raise Error("invalid log start revision provided") if type(end) is not type(0): try: end = int(end) except (ValueError, TypeError): raise Error("invalid log end revision provided") start = start or "HEAD" cmd.extend(("-r", "%s:%s" % (start, end))) if limit is not None: try: limit = int(limit) except (ValueError, TypeError): raise Error("invalid limit number provided") cmd.extend(("--limit", str(limit))) status, output = self._execVcs(*cmd, **kwargs) if status != 0: return None revheader = re.compile("^r(?P[0-9]+) \| (?P[^\|]+) \| (?P[^\|]+) \| (?P[0-9]+) (?:line|lines)$") changedpat = re.compile(r"^\s+(?P[^\s]+) (?P[^\s]+)(?: \([^\s]+ (?P[^:]+)(?:\:(?P[0-9]+))?\))?$") logseparator = "-"*72 linesleft = 0 entry = None log = [] appendchanged = 0 changedheader = 0 for line in output.splitlines(): line = line.rstrip() if changedheader: appendchanged = 1 changedheader = 0 elif appendchanged: if not line: appendchanged = 0 continue m = changedpat.match(line) if m: changed = m.groupdict().copy() from_rev = changed.get("from_rev") if from_rev is not None: try: changed["from_rev"] = int(from_rev) except (ValueError, TypeError): raise Error("invalid revision number in % log" % self.vcs_name) entry.changed.append(changed) elif linesleft == 0: if line != logseparator: m = revheader.match(line) if m: linesleft = int(m.group("lines")) timestr = " ".join(m.group("date").split()[:2]) timetuple = time.strptime(timestr, "%Y-%m-%d %H:%M:%S") entry = VCSLogEntry(int(m.group("revision")), m.group("author"), timetuple) log.append(entry) changedheader = 1 else: entry.lines.append(line) linesleft -= 1 log.sort() log.reverse() return log def mv(self, path, dest, message=None, **kwargs): cmd = ["mv", path, dest, ] if message: cmd.extend(("-m", str(message))) else: kwargs['show'] = True self._add_log(cmd, kwargs) return self._execVcs_success(*cmd, **kwargs) class VCSLook(object): def __init__(self, repospath, txn=None, rev=None): self.repospath = repospath self.txn = txn self.rev = rev self.execcmd = None def _execVcslook(self, cmd, *args, **kwargs): execcmd_args = ["svnlook", cmd, self.repospath] self._add_txnrev(execcmd_args, kwargs) execcmd_args += args execcmd_kwargs = {} keywords = ["show", "noerror"] for key in keywords: if key in kwargs: execcmd_kwargs[key] = kwargs[key] return execcmd(*execcmd_args, **execcmd_kwargs) def _add_txnrev(self, cmd_args, received_kwargs): if "txn" in received_kwargs: txn = received_kwargs.get("txn") if txn is not None: cmd_args.extend(("-t", txn)) elif self.txn is not None: cmd_args.extend(("-t", self.txn)) if "rev" in received_kwargs: rev = received_kwargs.get("rev") if rev is not None: cmd_args.exten(("-r", str(rev))) elif self.rev is not None: cmd_args.extend(("-r", str(self.rev))) def changed(self, **kwargs): status, output = self._execVcslook("changed", **kwargs) if status != 0: return None changes = [] for line in output.splitlines(): line = line.rstrip() if not line: continue entry = [None, None, None] changedata, changeprop, path = None, None, None if line[0] != "_": changedata = line[0] if line[1] != " ": changeprop = line[1] path = line[4:] changes.append((changedata, changeprop, path)) return changes def author(self, **kwargs): status, output = self._execVcslook("author", **kwargs) if status != 0: return None return output.strip() # vim:et:ts=4:sw=4