aboutsummaryrefslogtreecommitdiffstats
path: root/MgaRepo/svn.py
diff options
context:
space:
mode:
Diffstat (limited to 'MgaRepo/svn.py')
-rw-r--r--MgaRepo/svn.py467
1 files changed, 440 insertions, 27 deletions
diff --git a/MgaRepo/svn.py b/MgaRepo/svn.py
index cc1ed1b..74f47a3 100644
--- a/MgaRepo/svn.py
+++ b/MgaRepo/svn.py
@@ -1,5 +1,5 @@
-from MgaRepo.util import execcmd
-from MgaRepo.VCS import *
+from MgaRepo import Error, SilentError, config
+from MgaRepo.util import execcmd, get_auth
import sys
import os
import re
@@ -7,32 +7,445 @@ import time
__all__ = ["SVN", "SVNLook", "SVNLogEntry"]
-class SVNLogEntry(VCSLogEntry):
+class SVNLogEntry:
def __init__(self, revision, author, date):
- VCSLogEntry.__init__(self, revision, author, data)
-
-class SVN(VCS):
- vcs_dirname = ".svn"
- vcs_name = "svn"
- def __init__(self, path=None, url=None):
- VCS.__init__(self, path, url)
- self.vcs_command = config.get("global", "svn-command", ["svn"])
- self.env_defaults = {"SVN_SSH": self.vcs_wrapper}
-
- def drop_ssh_if_no_auth(self, url):
- if url and url.startswith("svn+ssh://"):
- cmd = ["info", "--non-interactive", "--no-newline", "--show-item", "url", url]
- status, output = self._execVcs(*cmd, local=True, noerror=True, show=False)
- if status == 1 and (("E170013" in output) or ("E210002" in output)):
- url = url.replace("svn+ssh://", "svn://")
- status, output = self._execVcs(*cmd, local=True, noerror=True, show=False)
- if status == 0 and output == url:
- pass
- return url
-
-class SVNLook(VCSLook):
+ self.revision = revision
+ self.author = author
+ self.date = date
+ self.changed = []
+ self.lines = []
+
+ def __lt__(self, other):
+ return (self.date < other.date)
+
+ def __eq__(self,other):
+ return (self.date == other.date)
+
+class SVN:
+ def _execsvn(self, *args, **kwargs):
+ localcmds = ("add", "revert", "cleanup", "mv")
+ if not kwargs.get("show") and args[0] not in localcmds:
+ args = list(args)
+ args.append("--non-interactive")
+ else:
+ if args[0] == "mv":
+ kwargs["geterr"] = False
+ else:
+ kwargs["geterr"] = True
+ kwargs["cleanerr"] = True
+ if kwargs.get("xml"):
+ args.append("--xml")
+ self._set_env()
+ svn_command = config.get("global", "svn-command", "svn")
+ cmdstr = svn_command + " " + " ".join(args)
+ try:
+ if args[0] in ('info', 'checkout','log'):
+ kwargs['info'] = True
+ else:
+ kwargs['info'] = False
+ return execcmd(cmdstr, **kwargs)
+ except Error as e:
+ msg = None
+ if e.args:
+ if "Permission denied" in e.args:
+ msg = ("It seems ssh-agent or ForwardAgent are not setup "
+ "or your username is wrong. See "
+ "https://wiki.mageia.org/en/Packagers_ssh"
+ " for more information.")
+ elif "authorization failed" in e.args:
+ msg = ("Note that mgarepo does not support any HTTP "
+ "authenticated access.")
+ if kwargs.get("show") and \
+ not config.getbool("global", "verbose", 0):
+ # svn has already dumped error messages, we don't need to
+ # do it too
+ if msg:
+ sys.stderr.write("\n")
+ sys.stderr.write(msg)
+ sys.stderr.write("\n")
+ raise SilentError
+ elif msg:
+ raise Error("%s\n%s" % (e, msg))
+ raise
+
+ def _set_env(self):
+ wrapper = "mgarepo-ssh"
+ repsys = config.get("global", "mgarepo-cmd")
+ if repsys:
+ dir = os.path.dirname(repsys)
+ path = os.path.join(dir, wrapper)
+ if os.path.exists(path):
+ wrapper = path
+ defaults = {"SVN_SSH": wrapper}
+ os.environ.update(defaults)
+ raw = config.get("global", "svn-env")
+ if raw:
+ for line in raw.split("\n"):
+ env = line.strip()
+ if not env:
+ continue
+ try:
+ name, value = env.split("=", 1)
+ except ValueError:
+ sys.stderr.write("invalid svn environment line: %r\n" % env)
+ continue
+ os.environ[name] = value
+
+ def _execsvn_success(self, *args, **kwargs):
+ status, output = self._execsvn(*args, **kwargs)
+ return status == 0
+
+ def _add_log(self, cmd_args, received_kwargs, optional=0):
+ if (not optional or
+ "log" in received_kwargs or
+ "logfile" in received_kwargs):
+ ret = received_kwargs.get("log")
+ if ret is not None:
+ cmd_args.append("-m '%s'" % ret)
+ ret = received_kwargs.get("logfile")
+ if ret is not None:
+ cmd_args.append("-F '%s'" % ret)
+
+ def _add_revision(self, cmd_args, received_kwargs, optional=0):
+ if not optional or "rev" in received_kwargs:
+ ret = received_kwargs.get("rev")
+ if isinstance(ret, str):
+ if not ret.startswith("{"): # if not a datespec
+ try:
+ ret = int(ret)
+ except ValueError:
+ raise Error("invalid revision provided")
+ if ret:
+ cmd_args.append("-r '%s'" % ret)
+
+ def add(self, path, **kwargs):
+ cmd = ["add", path + '@' if '@' in path else path]
+ return self._execsvn_success(noauth=1, *cmd, **kwargs)
+
+ def copy(self, pathfrom, pathto, **kwargs):
+ cmd = ["copy", pathfrom + '@' if '@' in pathfrom else pathfrom, pathto + '@' if '@' in pathto else pathto]
+ self._add_revision(cmd, kwargs, optional=1)
+ self._add_log(cmd, kwargs)
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def remove(self, path, force=0, **kwargs):
+ cmd = ["remove", path + '@' if '@' in path else path]
+ self._add_log(cmd, kwargs)
+ if force:
+ cmd.append("--force")
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def mkdir(self, path, **kwargs):
+ cmd = ["mkdir", path + '@' if '@' in path else path]
+ if kwargs.get("parents"):
+ cmd.append("--parents")
+ self._add_log(cmd, kwargs)
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def _execsvn_commit(self, *cmd, **kwargs):
+ status, output = self._execsvn(*cmd, **kwargs)
+ match = re.search("Committed revision (?P<rev>\\d+)\\.$", output)
+ if match:
+ rawrev = match.group("rev")
+ return int(rawrev)
+
+ def commit(self, path, **kwargs):
+ cmd = ["commit", path + '@' if '@' in path else path]
+ if kwargs.get("nonrecursive"):
+ cmd.append("-N")
+ self._add_log(cmd, kwargs)
+ return self._execsvn_commit(*cmd, **kwargs)
+
+ def import_(self, path, url, **kwargs):
+ cmd = ["import", "'%s'" % path, "'%s'" % url]
+ self._add_log(cmd, kwargs)
+ return self._execsvn_commit(*cmd, **kwargs)
+
+ def export(self, url, targetpath, **kwargs):
+ cmd = ["export", "'%s'" % url, targetpath]
+ self._add_revision(cmd, kwargs, optional=1)
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def checkout(self, url, targetpath, **kwargs):
+ cmd = ["checkout", "'%s'" % url, targetpath]
+ self._add_revision(cmd, kwargs, optional=1)
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def propget(self, propname, targets, **kwargs):
+ cmd = ["propget", propname, targets]
+ if kwargs.get("revprop"):
+ cmd.append("--revprop")
+ self._add_revision(cmd, kwargs)
+ status, output = self._execsvn(local=True, *cmd, **kwargs)
+ return output
+
+ def propset(self, propname, value, targets, **kwargs):
+ cmd = ["propset", propname, "'%s'" % value, targets]
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def propedit(self, propname, target, **kwargs):
+ cmd = ["propedit", propname, target]
+ if kwargs.get("rev"):
+ cmd.append("--revprop")
+ self._add_revision(cmd, kwargs)
+ return self._execsvn_success(local=True, show=True, *cmd, **kwargs)
+
+ def revision(self, path, **kwargs):
+ cmd = ["info", path + '@' if '@' in path else path]
+ status, output = self._execsvn(local=True, *cmd, **kwargs)
+ if status == 0:
+ for line in output.splitlines():
+ if line.startswith("Last Changed Rev: "):
+ return int(line.split()[3])
+ return None
+
+ def info(self, path, **kwargs):
+ cmd = ["info", path + '@' if '@' in path else path]
+ status, output = self._execsvn(local=True, noerror=True, *cmd, **kwargs)
+ if (("Not a versioned resource" not in output) and ("svn: warning: W155010" not in output)):
+ return output.splitlines()
+ return None
+
+ def info2(self, *args, **kwargs):
+ lines = self.info(*args, **kwargs)
+ if lines is None:
+ return None
+ pairs = [[w.strip() for w in line.split(":", 1)] for line in lines]
+ info = {}
+ for pair in pairs:
+ if pair != ['']:
+ info[pair[0]]=pair[1]
+ return info
+
+ def ls(self, path, **kwargs):
+ cmd = ["ls", path + '@' if '@' in path else path]
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return output.split()
+ return None
+
+ def status(self, path, **kwargs):
+ cmd = ["status", path + '@' if '@' in path else path]
+ if kwargs.get("verbose"):
+ cmd.append("-v")
+ if kwargs.get("noignore"):
+ cmd.append("--no-ignore")
+ if kwargs.get("quiet"):
+ cmd.append("--quiet")
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return [(x[0], x[8:]) for x in output.splitlines()]
+ return None
+
+ def cleanup(self, path, **kwargs):
+ cmd = ["cleanup", path + '@' if '@' in path else path]
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def revert(self, path, **kwargs):
+ cmd = ["revert", path + '@' if '@' in path else path]
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return [x.split() for x in output.split()]
+ return None
+
+ def switch(self, url, oldurl=None, path=None, relocate=False, **kwargs):
+ cmd = ["switch"]
+ if relocate:
+ if oldurl is None:
+ raise Error("You must supply the old URL when "\
+ "relocating working copies")
+ cmd.append("--relocate")
+ cmd.append(oldurl)
+ cmd.append(url)
+ if path is not None:
+ cmd.append(path)
+ return self._execsvn_success(*cmd, **kwargs)
+
+ def update(self, path, **kwargs):
+ cmd = ["update", path + '@' if '@' in path else path]
+ self._add_revision(cmd, kwargs, optional=1)
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return [x.split() for x in output.split()]
+ return None
+
+ def merge(self, url1, url2=None, rev1=None, rev2=None, path=None,
+ **kwargs):
+ cmd = ["merge"]
+ if rev1 and rev2 and not url2:
+ cmd.append("-r")
+ cmd.append("%s:%s" % (rev1, rev2))
+ cmd.append(url1)
+ else:
+ if not url2:
+ raise ValueError("url2 needed if two revisions are not provided")
+ if rev1:
+ cmd.append("%s@%s" % (url1, rev1))
+ else:
+ cmd.append(url1)
+ if rev2:
+ cmd.append("%s@%s" % (url2, rev2))
+ else:
+ cmd.append(url2)
+ if path:
+ cmd.append(path)
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return [x.split() for x in output.split()]
+ return None
+
+ def diff(self, pathurl1, pathurl2=None, **kwargs):
+ cmd = ["diff", pathurl1]
+ self._add_revision(cmd, kwargs, optional=1)
+ if pathurl2:
+ cmd.append(pathurl2)
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return output
+ return None
+
+ def cat(self, url, **kwargs):
+ cmd = ["cat", url]
+ self._add_revision(cmd, kwargs, optional=1)
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status == 0:
+ return output
+ return None
+
+ def log(self, url, start=None, end=0, limit=None, **kwargs):
+ cmd = ["log", "-v", url]
+ if start is not None or end != 0:
+ if start is not None and type(start) is not type(0):
+ try:
+ start = int(start)
+ except (ValueError, TypeError):
+ raise Error("invalid log start revision provided")
+ if type(end) is not type(0):
+ try:
+ end = int(end)
+ except (ValueError, TypeError):
+ raise Error("invalid log end revision provided")
+ start = start or "HEAD"
+ cmd.append("-r %s:%s" % (start, end))
+ if limit is not None:
+ try:
+ limit = int(limit)
+ except (ValueError, TypeError):
+ raise Error("invalid limit number provided")
+ cmd.append("--limit %d" % limit)
+ status, output = self._execsvn(*cmd, **kwargs)
+ if status != 0:
+ return None
+
+ revheader = re.compile("^r(?P<revision>[0-9]+) \| (?P<author>[^\|]+) \| (?P<date>[^\|]+) \| (?P<lines>[0-9]+) (?:line|lines)$")
+ changedpat = re.compile(r"^\s+(?P<action>[^\s]+) (?P<path>[^\s]+)(?: \([^\s]+ (?P<from_path>[^:]+)(?:\:(?P<from_rev>[0-9]+))?\))?$")
+ logseparator = "-"*72
+ linesleft = 0
+ entry = None
+ log = []
+ appendchanged = 0
+ changedheader = 0
+ for line in output.splitlines():
+ line = line.rstrip()
+ if changedheader:
+ appendchanged = 1
+ changedheader = 0
+ elif appendchanged:
+ if not line:
+ appendchanged = 0
+ continue
+ m = changedpat.match(line)
+ if m:
+ changed = m.groupdict().copy()
+ from_rev = changed.get("from_rev")
+ if from_rev is not None:
+ try:
+ changed["from_rev"] = int(from_rev)
+ except (ValueError, TypeError):
+ raise Error("invalid revision number in svn log")
+ entry.changed.append(changed)
+ elif linesleft == 0:
+ if line != logseparator:
+ m = revheader.match(line)
+ if m:
+ linesleft = int(m.group("lines"))
+ timestr = " ".join(m.group("date").split()[:2])
+ timetuple = time.strptime(timestr,
+ "%Y-%m-%d %H:%M:%S")
+ entry = SVNLogEntry(int(m.group("revision")),
+ m.group("author"), timetuple)
+ log.append(entry)
+ changedheader = 1
+ else:
+ entry.lines.append(line)
+ linesleft -= 1
+ log.sort()
+ log.reverse()
+ return log
+
+ def mv(self, path, dest, message=None, **kwargs):
+ cmd = ["mv", path, dest, ]
+ if message:
+ cmd.append("-m '%s'"%message)
+ else:
+ kwargs['show'] = True
+ self._add_log(cmd, kwargs)
+ return self._execsvn_success(*cmd, **kwargs)
+
+class SVNLook:
def __init__(self, repospath, txn=None, rev=None):
- VCSLook.__init__(self, repospath, txn, rev)
- self.execcmd = "svnlook"
+ self.repospath = repospath
+ self.txn = txn
+ self.rev = rev
+
+ def _execsvnlook(self, cmd, *args, **kwargs):
+ execcmd_args = ["svnlook", cmd, self.repospath]
+ self._add_txnrev(execcmd_args, kwargs)
+ execcmd_args += args
+ execcmd_kwargs = {}
+ keywords = ["show", "noerror"]
+ for key in keywords:
+ if key in kwargs:
+ execcmd_kwargs[key] = kwargs[key]
+ return execcmd(*execcmd_args, **execcmd_kwargs)
+
+ def _add_txnrev(self, cmd_args, received_kwargs):
+ if "txn" in received_kwargs:
+ txn = received_kwargs.get("txn")
+ if txn is not None:
+ cmd_args += ["-t", txn]
+ elif self.txn is not None:
+ cmd_args += ["-t", self.txn]
+ if "rev" in received_kwargs:
+ rev = received_kwargs.get("rev")
+ if rev is not None:
+ cmd_args += ["-r", rev]
+ elif self.rev is not None:
+ cmd_args += ["-r", self.rev]
+
+ def changed(self, **kwargs):
+ status, output = self._execsvnlook("changed", **kwargs)
+ if status != 0:
+ return None
+ changes = []
+ for line in output.splitlines():
+ line = line.rstrip()
+ if not line:
+ continue
+ entry = [None, None, None]
+ changedata, changeprop, path = None, None, None
+ if line[0] != "_":
+ changedata = line[0]
+ if line[1] != " ":
+ changeprop = line[1]
+ path = line[4:]
+ changes.append((changedata, changeprop, path))
+ return changes
+
+ def author(self, **kwargs):
+ status, output = self._execsvnlook("author", **kwargs)
+ if status != 0:
+ return None
+ return output.strip()
# vim:et:ts=4:sw=4