1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
""" Handles repository layout scheme and package URLs."""
import os
import urllib.parse
from MgaRepo import Error, config
from MgaRepo.svn import SVN
from MgaRepo.rpmutil import detectVCS
__all__ = ["package_url", "checkout_url", "repository_url", "get_url_revision"]
def layout_dirs():
devel_branch = config.get("global", "trunk-dir", "cauldron/")
devel_branch = os.path.normpath(devel_branch)
branches_dir = config.get("global", "branches-dir", "updates/")
branches_dir = os.path.normpath(branches_dir)
backports_dir = config.get("global", "backports-dir", "backports/")
backports_dir = os.path.normpath(backports_dir)
obsolete_dir = config.get("global", "obsolete-dir", "obsolete/")
obsolete_dir = os.path.normpath(obsolete_dir)
return devel_branch, branches_dir, backports_dir, obsolete_dir
def get_url_revision(url, retrieve=True):
"""Get the revision from a given URL
If the URL contains an explicit revision number (URL@REV), just use it
without even checking if the revision really exists.
The parameter retrieve defines whether it must ask the SVN server for
the revision number or not when it is not found in the URL.
"""
url, rev = split_url_revision(url)
if rev is None and retrieve:
# if no revspec was found, ask the server
svn = SVN()
rev = svn.revision(url)
return rev
def unsplit_url_revision(url, rev):
if rev is None:
newurl = url
else:
parsed = list(urllib.parse.urlparse(url))
path = os.path.normpath(parsed[2])
parsed[2] = path + "@" + str(rev)
newurl = urllib.parse.urlunparse(parsed)
return newurl
def split_url_revision(url):
"""Returns a tuple (url, rev) from an subversion URL with @REV
If the revision is not present in the URL, rev is None.
"""
parsed = list(urllib.parse.urlparse(url))
path = os.path.normpath(parsed[2])
dirs = path.rsplit("/", 1)
lastname = dirs[-1]
newname = lastname
index = lastname.rfind("@")
rev = None
if index != -1:
newname = lastname[:index]
rawrev = lastname[index+1:]
if rawrev:
try:
rev = int(rawrev)
if rev < 0:
raise ValueError
except ValueError:
raise Error("invalid revision specification on URL: %s" % url)
dirs[-1] = newname
newpath = "/".join(dirs)
parsed[2] = newpath
newurl = urllib.parse.urlunparse(parsed)
return newurl, rev
def checkout_url(pkgdirurl, branch=None, version=None, release=None, backports=None,
releases=False, pristine=False, append_path=None):
"""Get the URL of a branch of the package, defaults to current/
It tries to preserve revisions in the format @REV.
"""
parsed = list(urllib.parse.urlparse(pkgdirurl))
path, rev = split_url_revision(parsed[2])
if releases:
path = os.path.normpath(path + "/releases")
elif version:
assert release is not None
path = os.path.normpath(path + "/releases/" + version + "/" + release)
elif pristine:
path = os.path.join(path, "pristine")
elif branch:
path = os.path.join(path, "branches", branch)
else:
path = os.path.join(path, "current")
if append_path:
path = os.path.join(path, append_path)
path = unsplit_url_revision(path, rev)
parsed[2] = path
newurl = urllib.parse.urlunparse(parsed)
return newurl
def convert_default_parent(url):
"""Removes the cauldron/ component from the URL"""
parsed = list(urllib.parse.urlparse(url))
path = os.path.normpath(parsed[2])
rest, last = os.path.split(path)
parsed[2] = rest
newurl = urllib.parse.urlunparse(parsed)
return newurl
def remove_current(pkgdirurl):
parsed = list(urllib.parse.urlparse(pkgdirurl))
path = os.path.normpath(parsed[2])
rest, last = os.path.split(path)
if last == "current":
# FIXME this way we will not allow packages to be named "current"
path = rest
parsed[2] = path
newurl = urllib.parse.urlunparse(parsed)
return newurl
def repository_url(mirrored=False):
url = None
if mirrored and config.getbool("global", "use-mirror", "yes"):
url = config.get("global", "mirror")
if url is None:
url = config.get("global", "repository")
if not url:
# compatibility with the default_parent configuration option
default_parent = config.get("global", "default_parent")
if default_parent is None:
raise Error("you need to set the 'repository' " \
"configuration option on mgarepo.conf")
url = convert_default_parent(default_parent)
vcs = detectVCS(url)
return vcs.url
def package_url(name_or_url, version=None, release=None, distro=None, backports=None,
mirrored=True, obsolete=None):
"""Returns a tuple with the absolute package URL and its name
@name_or_url: name, relative path, or URL of the package. In case it is
a URL, the URL will just be 'normalized'.
@version: the version to be fetched from releases/ (requires release)
@release: the release number to be fetched from releases/$version/
@distro: the name of the repository branch inside updates/
@backports: the name of the repository branch inside backports/
@mirrored: return an URL based on the mirror repository, if enabled
"""
from MgaRepo import mirror
if "://" in name_or_url:
pkgdirurl = mirror.normalize_path(name_or_url)
pkgdirurl = remove_current(pkgdirurl)
if mirror.using_on(pkgdirurl) and not mirrored:
pkgdirurl = mirror.relocate_path(mirror.mirror_url(),
repository_url(), pkgdirurl)
else:
name = name_or_url
devel_branch, branches_dir, backports_dir, obsolete_dir= layout_dirs()
if distro or "/" in name:
default_branch = branches_dir
if distro:
default_branch = os.path.join(default_branch, distro)
elif backports:
default_branch = os.path.join(backports_dir, backports)
elif obsolete:
default_branch = os.path.join(obsolete_dir)
else:
default_branch = devel_branch # cauldron
path = os.path.join(default_branch, name)
parsed = list(urllib.parse.urlparse(repository_url(mirrored=mirrored)))
parsed[2] = os.path.join(parsed[2], path)
pkgdirurl = urllib.parse.urlunparse(parsed)
return pkgdirurl
def package_name(pkgdirurl):
"""Returns the package name from a package URL
It takes care of revision numbers"""
parsed = urllib.parse.urlparse(pkgdirurl)
path, rev = split_url_revision(parsed[2])
rest, name = os.path.split(path)
return name
def package_spec_url(pkgdirurl, *args, **kwargs):
"""Returns the URL of the specfile of a given package URL
The parameters are the same used by checkout_url, except append_path.
"""
kwargs["append_path"] = "SPECS/" + package_name(pkgdirurl) + ".spec"
specurl = checkout_url(pkgdirurl, *args, **kwargs)
return specurl
def distro_branch(pkgdirurl):
"""Tries to guess the distro branch name from a package URL"""
from MgaRepo.mirror import same_base
found = None
repo = repository_url()
if same_base(repo, pkgdirurl):
devel_branch, branches_dir, backports_dir, obsolete_dir = layout_dirs()
repo_path = urllib.parse.urlparse(repo)[2]
devel_path = os.path.join(repo_path, devel_branch)
branches_path = os.path.join(repo_path, branches_dir)
parsed = urllib.parse.urlparse(pkgdirurl)
path = os.path.normpath(parsed[2])
if path.startswith(devel_path):
# devel_branch must be before branches_dir in order to allow
# devel_branch to be inside branches_dir, as in /branches/cauldron
_, found = os.path.split(devel_branch)
elif path.startswith(branches_path):
comps = path.split("/")
if branches_path == "/":
found = comps[1]
elif len(comps) >= 2: # must be at least branch/pkgname
found = comps[branches_path.count("/")+1]
return found
|