diff options
Diffstat (limited to 'MgaRepo/VCS.py')
-rw-r--r-- | MgaRepo/VCS.py | 470 |
1 files changed, 470 insertions, 0 deletions
diff --git a/MgaRepo/VCS.py b/MgaRepo/VCS.py new file mode 100644 index 0000000..ee419f4 --- /dev/null +++ b/MgaRepo/VCS.py @@ -0,0 +1,470 @@ +from MgaRepo import Error, SilentError, config +from MgaRepo.util import execcmd, get_auth +from MgaRepo import layout +from xml.etree import ElementTree +import sys +import os +import re +import time + +class VCSLogEntry(object): + def __init__(self, revision, author, date, lines=[], changed=[]): + self.revision = revision + self.author = author + self.date = date + self.changed = changed + self.lines = lines + + def __lt__(self, other): + return (self.date < other.date) + + def __eq__(self,other): + return (self.date == other.date) + +class VCS(object): + vcs_dirname = None + vcs_name = None + def __init__(self, path, url): + self.vcs_command = None + self.vcs_wrapper = "mga-ssh" + self.vcs_supports = {'clone' : False} + self.vcs_type = None + self.env_defaults = None + if not path and not url: + self._path = os.path.curdir + elif not path and url: + self._path = layout.package_name(layout.remove_current(url)) + else: + self._path = path + self._url = url + + def _execVcs(self, *args, **kwargs): + localcmds = ("add", "revert", "cleanup", "mv") + cmd = self.vcs_command + list(args) + kwargs["collecterr"] = kwargs.get("collecterr", False) + if kwargs.get("show"): + if not kwargs.get("local"): + kwargs["collecterr"] = True + else: + if self.vcs_command is "svn" and args[0] not in localcmds: + cmd.append("--non-interactive") + else: + if args[0] == "mv": + kwargs["collecterr"] = False + kwargs["cleanerr"] = kwargs.get("cleanerr", True) + if kwargs.get("xml"): + cmd.append("--xml") + try: + if args[0] in ('info', 'checkout','log'): + kwargs['info'] = True + else: + kwargs['info'] = False + return execcmd(*cmd, **kwargs) + except Error as e: + msg = None + if e.args: + if "Permission denied" in e.args: + msg = ("It seems ssh-agent or ForwardAgent are not setup " + "or your username is wrong. See " + "https://wiki.mageia.org/en/Packagers_ssh" + " for more information.") + elif "authorization failed" in e.args: + msg = ("Note that mgarepo does not support any HTTP " + "authenticated access.") + if kwargs.get("show") and \ + not config.getbool("global", "verbose", 0): + # svn has already dumped error messages, we don't need to + # do it too + if msg: + sys.stderr.write("\n") + sys.stderr.write(msg) + sys.stderr.write("\n") + raise SilentError + elif msg: + raise Error("%s\n%s" % (e, msg)) + raise + + def _set_env(self): + wrapper = "mgarepo-ssh" + repsys = config.get("global", "mgarepo-cmd") + if repsys: + dir = os.path.dirname(repsys) + path = os.path.join(dir, wrapper) + if os.path.exists(path): + wrapper = path + defaults = {"SVN_SSH": wrapper} + os.environ.update(defaults) + raw = config.get("global", "svn-env") + if raw: + for line in raw.split("\n"): + env = line.strip() + if not env: + continue + try: + name, value = env.split("=", 1) + except ValueError: + sys.stderr.write("invalid svn environment line: %r\n" % env) + continue + os.environ[name] = value + + def _execVcs_success(self, *args, **kwargs): + status, output = self._execVcs(*args, **kwargs) + return status == 0 + + def _add_log(self, cmd_args, received_kwargs, optional=0): + if (not optional or + "log" in received_kwargs or + "logfile" in received_kwargs): + ret = received_kwargs.get("log") + if ret is not None: + cmd_args.extend(("-m", ret)) + ret = received_kwargs.get("logfile") + if ret is not None: + cmd_args.extend(("-F", ret)) + + def _add_revision(self, cmd_args, received_kwargs, optional=0): + if not optional or "rev" in received_kwargs: + ret = received_kwargs.get("rev") + if isinstance(ret, str): + if not ret.startswith("{"): # if not a datespec + try: + ret = int(ret) + except ValueError: + raise Error("invalid revision provided") + if ret: + cmd_args.extend(("-r", str(ret))) + + def add(self, path, **kwargs): + cmd = ["add", path + '@' if '@' in path else path] + return self._execVcs_success(noauth=1, *cmd, **kwargs) + + def copy(self, pathfrom, pathto, **kwargs): + cmd = ["copy", pathfrom + '@' if '@' in pathfrom else pathfrom, pathto + '@' if '@' in pathto else pathto] + self._add_revision(cmd, kwargs, optional=1) + self._add_log(cmd, kwargs) + return self._execVcs_success(*cmd, **kwargs) + + def remove(self, path, force=0, **kwargs): + cmd = ["remove", path + '@' if '@' in path else path] + self._add_log(cmd, kwargs) + if force: + cmd.append("--force") + return self._execVcs_success(*cmd, **kwargs) + + def mkdir(self, path, **kwargs): + cmd = ["mkdir", path + '@' if '@' in path else path] + if kwargs.get("parents"): + cmd.append("--parents") + self._add_log(cmd, kwargs) + return self._execVcs_success(*cmd, **kwargs) + + def _execVcs_commit(self, *cmd, **kwargs): + status, output = self._execVcs(*cmd, **kwargs) + match = re.search("Committed revision (?P<rev>\\d+)\\.$", output) + if match: + rawrev = match.group("rev") + return int(rawrev) + + def commit(self, path, **kwargs): + cmd = ["commit", path + '@' if '@' in path else path] + if kwargs.get("nonrecursive"): + cmd.append("-N") + self._add_log(cmd, kwargs) + return self._execVcs_commit(*cmd, **kwargs) + + def import_(self, path, url, **kwargs): + cmd = ["import", path, url] + self._add_log(cmd, kwargs) + return self._execVcs_commit(*cmd, **kwargs) + + def export(self, url, targetpath, **kwargs): + cmd = ["export", url, targetpath] + self._add_revision(cmd, kwargs, optional=1) + return self._execVcs_success(*cmd, **kwargs) + + def checkout(self, url, targetpath, **kwargs): + cmd = ["checkout", url, targetpath] + self._add_revision(cmd, kwargs, optional=1) + return self._execVcs_success(*cmd, **kwargs) + + def clone(self, url, targetpath, **kwargs): + if self.vcs_supports['clone']: + cmd = ["clone", url, targetpath] + return self._execVcs_success(*cmd, **kwargs) + else: + raise Error("%s doesn't support 'clone'" % self.vcs_name) + + def propget(self, propname, targets, **kwargs): + cmd = ["propget", propname, targets] + if kwargs.get("revprop"): + cmd.append("--revprop") + self._add_revision(cmd, kwargs) + status, output = self._execVcs(local=True, *cmd, **kwargs) + return output + + def propset(self, propname, value, targets, **kwargs): + cmd = ["propset", propname, value, targets] + return self._execVcs_success(*cmd, **kwargs) + + def propedit(self, propname, target, **kwargs): + cmd = ["propedit", propname, target] + if kwargs.get("rev"): + cmd.append("--revprop") + self._add_revision(cmd, kwargs) + return self._execVcs_success(local=True, show=True, *cmd, **kwargs) + + def revision(self, path, **kwargs): + cmd = ["info", path + '@' if '@' in path else path] + status, output = self._execVcs(local=True, *cmd, **kwargs) + if status == 0: + for line in output.splitlines(): + if line.startswith("Last Changed Rev: "): + return int(line.split()[3]) + return None + + def info(self, path, **kwargs): + cmd = ["info", path + '@' if '@' in path else path] + status, output = self._execVcs(local=True, noerror=True, *cmd, **kwargs) + if (("Not a versioned resource" not in output) and ("svn: warning: W155010" not in output)): + return output.splitlines() + return None + + def info2(self, *args, **kwargs): + lines = self.info(*args, **kwargs) + if lines is None: + return None + pairs = [[w.strip() for w in line.split(":", 1)] for line in lines] + info = {} + for pair in pairs: + if pair != ['']: + info[pair[0]]=pair[1] + return info + + def ls(self, path, **kwargs): + cmd = ["ls", path + '@' if '@' in path else path] + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return output.split() + return None + + def status(self, path, **kwargs): + cmd = ["status", path + '@' if '@' in path else path] + if kwargs.get("verbose"): + cmd.append("-v") + if kwargs.get("noignore"): + cmd.append("--no-ignore") + if kwargs.get("quiet"): + cmd.append("--quiet") + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return [(x[0], x[8:]) for x in output.splitlines()] + return None + + def cleanup(self, path, **kwargs): + cmd = ["cleanup", path + '@' if '@' in path else path] + return self._execVcs_success(*cmd, **kwargs) + + def revert(self, path, **kwargs): + cmd = ["revert", path + '@' if '@' in path else path] + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return [x.split() for x in output.split()] + return None + + def switch(self, url, oldurl=None, path=None, relocate=False, **kwargs): + cmd = ["switch"] + if relocate: + if oldurl is None: + raise Error("You must supply the old URL when "\ + "relocating working copies") + cmd.append("--relocate") + cmd.append(oldurl) + cmd.append(url) + if path is not None: + cmd.append(path) + return self._execVcs_success(*cmd, **kwargs) + + def update(self, path, **kwargs): + cmd = ["update", path + '@' if '@' in path else path] + self._add_revision(cmd, kwargs, optional=1) + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return [x.split() for x in output.split()] + return None + + def merge(self, url1, url2=None, rev1=None, rev2=None, path=None, + **kwargs): + cmd = ["merge"] + if rev1 and rev2 and not url2: + cmd.append("-r") + cmd.append("%s:%s" % (rev1, rev2)) + cmd.append(url1) + else: + if not url2: + raise ValueError("url2 needed if two revisions are not provided") + if rev1: + cmd.append("%s@%s" % (url1, rev1)) + else: + cmd.append(url1) + if rev2: + cmd.append("%s@%s" % (url2, rev2)) + else: + cmd.append(url2) + if path: + cmd.append(path) + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return [x.split() for x in output.split()] + return None + + def diff(self, pathurl1, pathurl2=None, **kwargs): + cmd = ["diff", pathurl1] + self._add_revision(cmd, kwargs, optional=1) + if pathurl2: + cmd.append(pathurl2) + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return output + return None + + def cat(self, url, **kwargs): + cmd = ["cat", url] + self._add_revision(cmd, kwargs, optional=1) + status, output = self._execVcs(*cmd, **kwargs) + if status == 0: + return output + return None + + def log(self, url, start=None, end=0, limit=None, **kwargs): + cmd = ["log", "-v", url] + if start is not None or end != 0: + if start is not None and type(start) is not type(0): + try: + start = int(start) + except (ValueError, TypeError): + raise Error("invalid log start revision provided") + if type(end) is not type(0): + try: + end = int(end) + except (ValueError, TypeError): + raise Error("invalid log end revision provided") + start = start or "HEAD" + cmd.extend(("-r", "%s:%s" % (start, end))) + if limit is not None: + try: + limit = int(limit) + except (ValueError, TypeError): + raise Error("invalid limit number provided") + cmd.extend(("--limit", str(limit))) + + status, output = self._execVcs(*cmd, xml=True, **kwargs) + if status != 0: + return None + + xmllog = ElementTree.fromstring(output) + log = [] + logentries = xmllog.getiterator("logentry") + for entry in logentries: + changed = [] + lines = [] + for pathelem in entry.getiterator("paths"): + path = pathelem.find("path") + from_rev = path.get("copyfrom-rev") + if from_rev: + from_rev = int(from_rev) + changed.append({"from_rev" : from_rev, "from_path" : path.get("copyfrom-path"), "action" : path.get("action"), "path" : path.text}) + date = entry.findtext("date").split("T") + timestr = "%s %s" % (date[0], date[1].split(".")[0]) + timetuple = time.strptime(timestr, "%Y-%m-%d %H:%M:%S") + lines.extend(entry.findtext("msg").rstrip().split("\n")) + logentry = VCSLogEntry(int(entry.attrib["revision"]), + entry.findtext("author"), timetuple, [line.rstrip() for line in lines], changed) + log.append(logentry) + log.sort() + log.reverse() + return log + + def mv(self, path, dest, message=None, **kwargs): + cmd = ["mv", path, dest, ] + if message: + cmd.extend(("-m", str(message))) + else: + kwargs['show'] = True + self._add_log(cmd, kwargs) + return self._execVcs_success(*cmd, **kwargs) + + def get_topdir(self): + vcsdir = os.path.join(self._path, self.vcs_dirname) + if os.path.exists(vcsdir) and os.path.isdir(vcsdir): + return self._path + else: + return None + + @property + def path(self): + return self._path + + @property + def url(self): + if not self._url: + self._url = self.info2(self._path)["URL"] + return self._url + +class VCSLook(object): + def __init__(self, repospath, txn=None, rev=None): + self.repospath = repospath + self.txn = txn + self.rev = rev + self.execcmd = None + + def _execVcslook(self, cmd, *args, **kwargs): + execcmd_args = ["svnlook", cmd, self.repospath] + self._add_txnrev(execcmd_args, kwargs) + execcmd_args += args + execcmd_kwargs = {} + keywords = ["show", "noerror"] + for key in keywords: + if key in kwargs: + execcmd_kwargs[key] = kwargs[key] + return execcmd(*execcmd_args, **execcmd_kwargs) + + def _add_txnrev(self, cmd_args, received_kwargs): + if "txn" in received_kwargs: + txn = received_kwargs.get("txn") + if txn is not None: + cmd_args.extend(("-t", txn)) + elif self.txn is not None: + cmd_args.extend(("-t", self.txn)) + if "rev" in received_kwargs: + rev = received_kwargs.get("rev") + if rev is not None: + cmd_args.exten(("-r", str(rev))) + elif self.rev is not None: + cmd_args.extend(("-r", str(self.rev))) + + def changed(self, **kwargs): + status, output = self._execVcslook("changed", **kwargs) + if status != 0: + return None + changes = [] + for line in output.splitlines(): + line = line.rstrip() + if not line: + continue + entry = [None, None, None] + changedata, changeprop, path = None, None, None + if line[0] != "_": + changedata = line[0] + if line[1] != " ": + changeprop = line[1] + path = line[4:] + changes.append((changedata, changeprop, path)) + return changes + + def author(self, **kwargs): + status, output = self._execVcslook("author", **kwargs) + if status != 0: + return None + return output.strip() + +# vim:et:ts=4:sw=4 |