aboutsummaryrefslogtreecommitdiffstats
path: root/MgaRepo/VCS.py
diff options
context:
space:
mode:
Diffstat (limited to 'MgaRepo/VCS.py')
-rw-r--r--MgaRepo/VCS.py475
1 files changed, 0 insertions, 475 deletions
diff --git a/MgaRepo/VCS.py b/MgaRepo/VCS.py
deleted file mode 100644
index 4edbd7f..0000000
--- a/MgaRepo/VCS.py
+++ /dev/null
@@ -1,475 +0,0 @@
-from MgaRepo import Error, SilentError, config
-from MgaRepo.util import execcmd, get_auth
-from MgaRepo import layout
-from xml.etree import ElementTree
-import sys
-import os
-import re
-import time
-
-class VCSLogEntry(object):
- def __init__(self, revision, author, date, lines=[], changed=[]):
- self.revision = revision
- self.author = author
- self.date = date
- self.changed = changed
- self.lines = lines
-
- def __lt__(self, other):
- return (self.date < other.date)
-
- def __eq__(self,other):
- return (self.date == other.date)
-
-class VCS(object):
- vcs_dirname = None
- vcs_name = None
- def __init__(self, path, url):
- self.vcs_command = None
- self.vcs_wrapper = "mga-ssh"
- self.vcs_supports = {'clone' : False}
- self.vcs_type = None
- self.env_defaults = None
- if not path and not url:
- self._path = os.path.curdir
- elif not path and url:
- self._path = layout.package_name(layout.remove_current(url))
- else:
- self._path = path
- # FIXME
- self._url = None
- self.__url = url
-
- def _execVcs(self, *args, **kwargs):
- localcmds = ("add", "revert", "cleanup", "mv")
- cmd = self.vcs_command + list(args)
- kwargs["collecterr"] = kwargs.get("collecterr", False)
- if kwargs.get("show"):
- if not kwargs.get("local"):
- kwargs["collecterr"] = True
- else:
- if self.vcs_command is "svn" and args[0] not in localcmds:
- cmd.append("--non-interactive")
- else:
- if args[0] == "mv":
- kwargs["collecterr"] = False
- kwargs["cleanerr"] = kwargs.get("cleanerr", True)
- if kwargs.get("xml"):
- cmd.append("--xml")
- try:
- if args[0] in ('info', 'checkout','log'):
- kwargs['info'] = True
- else:
- kwargs['info'] = False
- return execcmd(*cmd, **kwargs)
- except Error as e:
- msg = None
- if e.args:
- if "Permission denied" in e.args:
- msg = ("It seems ssh-agent or ForwardAgent are not setup "
- "or your username is wrong. See "
- "https://wiki.mageia.org/en/Packagers_ssh"
- " for more information.")
- elif "authorization failed" in e.args:
- msg = ("Note that mgarepo does not support any HTTP "
- "authenticated access.")
- if kwargs.get("show") and \
- not config.getbool("global", "verbose", 0):
- # svn has already dumped error messages, we don't need to
- # do it too
- if msg:
- sys.stderr.write("\n")
- sys.stderr.write(msg)
- sys.stderr.write("\n")
- raise SilentError
- elif msg:
- raise Error("%s\n%s" % (e, msg))
- raise
-
- def _set_env(self):
- wrapper = "mgarepo-ssh"
- repsys = config.get("global", "mgarepo-cmd")
- if repsys:
- dir = os.path.dirname(repsys)
- path = os.path.join(dir, wrapper)
- if os.path.exists(path):
- wrapper = path
- defaults = {"SVN_SSH": wrapper}
- os.environ.update(defaults)
- raw = config.get("global", "svn-env")
- if raw:
- for line in raw.split("\n"):
- env = line.strip()
- if not env:
- continue
- try:
- name, value = env.split("=", 1)
- except ValueError:
- sys.stderr.write("invalid svn environment line: %r\n" % env)
- continue
- os.environ[name] = value
-
- def _execVcs_success(self, *args, **kwargs):
- status, output = self._execVcs(*args, **kwargs)
- return status == 0
-
- def _add_log(self, cmd_args, received_kwargs, optional=0):
- if (not optional or
- "log" in received_kwargs or
- "logfile" in received_kwargs):
- ret = received_kwargs.get("log")
- if ret is not None:
- cmd_args.extend(("-m", ret))
- ret = received_kwargs.get("logfile")
- if ret is not None:
- cmd_args.extend(("-F", ret))
-
- def _add_revision(self, cmd_args, received_kwargs, optional=0):
- if not optional or "rev" in received_kwargs:
- ret = received_kwargs.get("rev")
- if isinstance(ret, str):
- if not ret.startswith("{"): # if not a datespec
- try:
- ret = int(ret)
- except ValueError:
- raise Error("invalid revision provided")
- if ret:
- cmd_args.extend(("-r", str(ret)))
-
- def add(self, path, **kwargs):
- cmd = ["add", path + '@' if '@' in path else path]
- return self._execVcs_success(noauth=1, *cmd, **kwargs)
-
- def copy(self, pathfrom, pathto, **kwargs):
- cmd = ["copy", pathfrom + '@' if '@' in pathfrom else pathfrom, pathto + '@' if '@' in pathto else pathto]
- self._add_revision(cmd, kwargs, optional=1)
- self._add_log(cmd, kwargs)
- return self._execVcs_success(*cmd, **kwargs)
-
- def remove(self, path, force=0, **kwargs):
- cmd = ["remove", path + '@' if '@' in path else path]
- self._add_log(cmd, kwargs)
- if force:
- cmd.append("--force")
- return self._execVcs_success(*cmd, **kwargs)
-
- def mkdir(self, path, **kwargs):
- cmd = ["mkdir", path + '@' if '@' in path else path]
- if kwargs.get("parents"):
- cmd.append("--parents")
- self._add_log(cmd, kwargs)
- return self._execVcs_success(*cmd, **kwargs)
-
- def _execVcs_commit(self, *cmd, **kwargs):
- status, output = self._execVcs(*cmd, **kwargs)
- match = re.search("Committed revision (?P<rev>\\d+)\\.$", output)
- if match:
- rawrev = match.group("rev")
- return int(rawrev)
-
- def commit(self, path, **kwargs):
- cmd = ["commit", path + '@' if '@' in path else path]
- if kwargs.get("nonrecursive"):
- cmd.append("-N")
- self._add_log(cmd, kwargs)
- return self._execVcs_commit(*cmd, **kwargs)
-
- def import_(self, path, url, **kwargs):
- cmd = ["import", path, url]
- self._add_log(cmd, kwargs)
- return self._execVcs_commit(*cmd, **kwargs)
-
- def export(self, url, targetpath, **kwargs):
- cmd = ["export", url, targetpath]
- self._add_revision(cmd, kwargs, optional=1)
- return self._execVcs_success(*cmd, **kwargs)
-
- def checkout(self, url, targetpath, **kwargs):
- cmd = ["checkout", url, targetpath]
- self._add_revision(cmd, kwargs, optional=1)
- return self._execVcs_success(*cmd, **kwargs)
-
- def clone(self, url, targetpath, **kwargs):
- if self.vcs_supports['clone']:
- cmd = ["clone", url, targetpath]
- return self._execVcs_success(*cmd, **kwargs)
- else:
- raise Error("%s doesn't support 'clone'" % self.vcs_name)
-
- def propget(self, propname, targets, **kwargs):
- cmd = ["propget", propname, targets]
- if kwargs.get("revprop"):
- cmd.append("--revprop")
- self._add_revision(cmd, kwargs)
- status, output = self._execVcs(local=True, *cmd, **kwargs)
- return output
-
- def propset(self, propname, value, targets, **kwargs):
- cmd = ["propset", propname, value, targets]
- return self._execVcs_success(*cmd, **kwargs)
-
- def propedit(self, propname, target, **kwargs):
- cmd = ["propedit", propname, target]
- if kwargs.get("rev"):
- cmd.append("--revprop")
- self._add_revision(cmd, kwargs)
- return self._execVcs_success(local=True, show=True, *cmd, **kwargs)
-
- def revision(self, path, **kwargs):
- cmd = ["info", path + '@' if '@' in path else path]
- status, output = self._execVcs(local=True, *cmd, **kwargs)
- if status == 0:
- for line in output.splitlines():
- if line.startswith("Last Changed Rev: "):
- return int(line.split()[3])
- return None
-
- def info(self, path, **kwargs):
- cmd = ["info", path + '@' if '@' in path else path]
- status, output = self._execVcs(local=True, noerror=True, *cmd, **kwargs)
- if (("Not a versioned resource" not in output) and ("svn: warning: W155010" not in output)):
- return output.splitlines()
- return None
-
- def info2(self, *args, **kwargs):
- lines = self.info(*args, **kwargs)
- if lines is None:
- return None
- pairs = [[w.strip() for w in line.split(":", 1)] for line in lines]
- info = {}
- for pair in pairs:
- if pair != ['']:
- info[pair[0]]=pair[1]
- return info
-
- def ls(self, path, **kwargs):
- cmd = ["ls", path + '@' if '@' in path else path]
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return output.split()
- return None
-
- def status(self, path, **kwargs):
- cmd = ["status", path + '@' if '@' in path else path]
- if kwargs.get("verbose"):
- cmd.append("-v")
- if kwargs.get("noignore"):
- cmd.append("--no-ignore")
- if kwargs.get("quiet"):
- cmd.append("--quiet")
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return [(x[0], x[8:]) for x in output.splitlines()]
- return None
-
- def cleanup(self, path, **kwargs):
- cmd = ["cleanup", path + '@' if '@' in path else path]
- return self._execVcs_success(*cmd, **kwargs)
-
- def revert(self, path, **kwargs):
- cmd = ["revert", path + '@' if '@' in path else path]
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return [x.split() for x in output.split()]
- return None
-
- def switch(self, url, oldurl=None, path=None, relocate=False, **kwargs):
- cmd = ["switch"]
- if relocate:
- if oldurl is None:
- raise Error("You must supply the old URL when "\
- "relocating working copies")
- cmd.append("--relocate")
- cmd.append(oldurl)
- cmd.append(url)
- if path is not None:
- cmd.append(path)
- return self._execVcs_success(*cmd, **kwargs)
-
- def update(self, path, **kwargs):
- cmd = ["update", path + '@' if '@' in path else path]
- self._add_revision(cmd, kwargs, optional=1)
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return [x.split() for x in output.split()]
- return None
-
- def merge(self, url1, url2=None, rev1=None, rev2=None, path=None,
- **kwargs):
- cmd = ["merge"]
- if rev1 and rev2 and not url2:
- cmd.append("-r")
- cmd.append("%s:%s" % (rev1, rev2))
- cmd.append(url1)
- else:
- if not url2:
- raise ValueError("url2 needed if two revisions are not provided")
- if rev1:
- cmd.append("%s@%s" % (url1, rev1))
- else:
- cmd.append(url1)
- if rev2:
- cmd.append("%s@%s" % (url2, rev2))
- else:
- cmd.append(url2)
- if path:
- cmd.append(path)
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return [x.split() for x in output.split()]
- return None
-
- def diff(self, pathurl1, pathurl2=None, **kwargs):
- cmd = ["diff", pathurl1]
- self._add_revision(cmd, kwargs, optional=1)
- if pathurl2:
- cmd.append(pathurl2)
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return output
- return None
-
- def cat(self, url, **kwargs):
- cmd = ["cat", url]
- self._add_revision(cmd, kwargs, optional=1)
- status, output = self._execVcs(*cmd, **kwargs)
- if status == 0:
- return output
- return None
-
- def log(self, url, start=None, end=0, limit=None, **kwargs):
- cmd = ["log", "-v", url]
- if start is not None or end != 0:
- if start is not None and type(start) is not type(0):
- try:
- start = int(start)
- except (ValueError, TypeError):
- raise Error("invalid log start revision provided")
- if type(end) is not type(0):
- try:
- end = int(end)
- except (ValueError, TypeError):
- raise Error("invalid log end revision provided")
- start = start or "HEAD"
- cmd.extend(("-r", "%s:%s" % (start, end)))
- if limit is not None:
- try:
- limit = int(limit)
- except (ValueError, TypeError):
- raise Error("invalid limit number provided")
- cmd.extend(("--limit", str(limit)))
-
- status, output = self._execVcs(*cmd, xml=True, **kwargs)
- if status != 0:
- return None
-
- xmllog = ElementTree.fromstring(output)
- log = []
- logentries = xmllog.getiterator("logentry")
- for entry in logentries:
- changed = []
- lines = []
- for pathelem in entry.getiterator("paths"):
- path = pathelem.find("path")
- from_rev = path.get("copyfrom-rev")
- if from_rev:
- from_rev = int(from_rev)
- changed.append({"from_rev" : from_rev, "from_path" : path.get("copyfrom-path"), "action" : path.get("action"), "path" : path.text})
- date = entry.findtext("date").split("T")
- timestr = "%s %s" % (date[0], date[1].split(".")[0])
- timetuple = time.strptime(timestr, "%Y-%m-%d %H:%M:%S")
- lines.extend(entry.findtext("msg").rstrip().split("\n"))
- logentry = VCSLogEntry(int(entry.attrib["revision"]),
- entry.findtext("author"), timetuple, [line.rstrip() for line in lines], changed)
- log.append(logentry)
- log.sort()
- log.reverse()
- return log
-
- def mv(self, path, dest, message=None, **kwargs):
- cmd = ["mv", path, dest, ]
- if message:
- cmd.extend(("-m", str(message)))
- else:
- kwargs['show'] = True
- self._add_log(cmd, kwargs)
- return self._execVcs_success(*cmd, **kwargs)
-
- def get_topdir(self):
- vcsdir = os.path.join(self._path, self.vcs_dirname)
- if os.path.exists(vcsdir) and os.path.isdir(vcsdir):
- return self._path
- else:
- return None
-
- def drop_ssh_if_no_auth(self, url):
- return url
-
- @property
- def path(self):
- return self._path
-
- @property
- def url(self):
- if not self._url:
- self._url = self.drop_ssh_if_no_auth(self.__url or self.info2(self._path)["URL"])
- return self._url
-
-class VCSLook(object):
- def __init__(self, repospath, txn=None, rev=None):
- self.repospath = repospath
- self.txn = txn
- self.rev = rev
- self.execcmd = None
-
- def _execVcslook(self, cmd, *args, **kwargs):
- execcmd_args = ["svnlook", cmd, self.repospath]
- self._add_txnrev(execcmd_args, kwargs)
- execcmd_args += args
- execcmd_kwargs = {}
- keywords = ["show", "noerror"]
- for key in keywords:
- if key in kwargs:
- execcmd_kwargs[key] = kwargs[key]
- return execcmd(*execcmd_args, **execcmd_kwargs)
-
- def _add_txnrev(self, cmd_args, received_kwargs):
- if "txn" in received_kwargs:
- txn = received_kwargs.get("txn")
- if txn is not None:
- cmd_args.extend(("-t", txn))
- elif self.txn is not None:
- cmd_args.extend(("-t", self.txn))
- if "rev" in received_kwargs:
- rev = received_kwargs.get("rev")
- if rev is not None:
- cmd_args.exten(("-r", str(rev)))
- elif self.rev is not None:
- cmd_args.extend(("-r", str(self.rev)))
-
- def changed(self, **kwargs):
- status, output = self._execVcslook("changed", **kwargs)
- if status != 0:
- return None
- changes = []
- for line in output.splitlines():
- line = line.rstrip()
- if not line:
- continue
- entry = [None, None, None]
- changedata, changeprop, path = None, None, None
- if line[0] != "_":
- changedata = line[0]
- if line[1] != " ":
- changeprop = line[1]
- path = line[4:]
- changes.append((changedata, changeprop, path))
- return changes
-
- def author(self, **kwargs):
- status, output = self._execVcslook("author", **kwargs)
- if status != 0:
- return None
- return output.strip()
-
-# vim:et:ts=4:sw=4