aboutsummaryrefslogtreecommitdiffstats
path: root/MgaRepo/ConfigParser.py
diff options
context:
space:
mode:
Diffstat (limited to 'MgaRepo/ConfigParser.py')
-rw-r--r--MgaRepo/ConfigParser.py434
1 files changed, 434 insertions, 0 deletions
diff --git a/MgaRepo/ConfigParser.py b/MgaRepo/ConfigParser.py
new file mode 100644
index 0000000..bac6d36
--- /dev/null
+++ b/MgaRepo/ConfigParser.py
@@ -0,0 +1,434 @@
+"""
+This is a heavily hacked version of ConfigParser to keep the order in
+which options and sections are read, and allow multiple options with
+the same key.
+"""
+from __future__ import generators
+import string, types
+import re
+
+__all__ = ["NoSectionError","DuplicateSectionError","NoOptionError",
+ "InterpolationError","InterpolationDepthError","ParsingError",
+ "MissingSectionHeaderError","ConfigParser",
+ "MAX_INTERPOLATION_DEPTH"]
+
+DEFAULTSECT = "DEFAULT"
+
+MAX_INTERPOLATION_DEPTH = 10
+
+# exception classes
+class Error(Exception):
+ def __init__(self, msg=''):
+ self._msg = msg
+ Exception.__init__(self, msg)
+ def __repr__(self):
+ return self._msg
+ __str__ = __repr__
+
+class NoSectionError(Error):
+ def __init__(self, section):
+ Error.__init__(self, 'No section: %s' % section)
+ self.section = section
+
+class DuplicateSectionError(Error):
+ def __init__(self, section):
+ Error.__init__(self, "Section %s already exists" % section)
+ self.section = section
+
+class NoOptionError(Error):
+ def __init__(self, option, section):
+ Error.__init__(self, "No option `%s' in section: %s" %
+ (option, section))
+ self.option = option
+ self.section = section
+
+class InterpolationError(Error):
+ def __init__(self, reference, option, section, rawval):
+ Error.__init__(self,
+ "Bad value substitution:\n"
+ "\tsection: [%s]\n"
+ "\toption : %s\n"
+ "\tkey : %s\n"
+ "\trawval : %s\n"
+ % (section, option, reference, rawval))
+ self.reference = reference
+ self.option = option
+ self.section = section
+
+class InterpolationDepthError(Error):
+ def __init__(self, option, section, rawval):
+ Error.__init__(self,
+ "Value interpolation too deeply recursive:\n"
+ "\tsection: [%s]\n"
+ "\toption : %s\n"
+ "\trawval : %s\n"
+ % (section, option, rawval))
+ self.option = option
+ self.section = section
+
+class ParsingError(Error):
+ def __init__(self, filename):
+ Error.__init__(self, 'File contains parsing errors: %s' % filename)
+ self.filename = filename
+ self.errors = []
+
+ def append(self, lineno, line):
+ self.errors.append((lineno, line))
+ self._msg = self._msg + '\n\t[line %2d]: %s' % (lineno, line)
+
+class MissingSectionHeaderError(ParsingError):
+ def __init__(self, filename, lineno, line):
+ Error.__init__(
+ self,
+ 'File contains no section headers.\nfile: %s, line: %d\n%s' %
+ (filename, lineno, line))
+ self.filename = filename
+ self.lineno = lineno
+ self.line = line
+
+class ConfigParser:
+ def __init__(self, defaults=None):
+ # Options are stored in __sections_list like this:
+ # [(sectname, [(optname, optval), ...]), ...]
+ self.__sections_list = []
+ self.__sections_dict = {}
+ if defaults is None:
+ self.__defaults = {}
+ else:
+ self.__defaults = defaults
+
+ def defaults(self):
+ return self.__defaults
+
+ def sections(self):
+ return self.__sections_dict.keys()
+
+ def has_section(self, section):
+ return self.__sections_dict.has_key(section)
+
+ def options(self, section):
+ self.__sections_dict[section]
+ try:
+ opts = self.__sections_dict[section].keys()
+ except KeyError:
+ raise NoSectionError(section)
+ return self.__defaults.keys()+opts
+
+ def read(self, filenames):
+ if type(filenames) in types.StringTypes:
+ filenames = [filenames]
+ for filename in filenames:
+ try:
+ fp = open(filename)
+ except IOError:
+ continue
+ self.__read(fp, filename)
+ fp.close()
+
+ def readfp(self, fp, filename=None):
+ if filename is None:
+ try:
+ filename = fp.name
+ except AttributeError:
+ filename = '<???>'
+ self.__read(fp, filename)
+
+ def set(self, section, option, value):
+ if self.__sections_dict.has_key(section):
+ sectdict = self.__sections_dict[section]
+ sectlist = []
+ self.__sections_list.append((section, sectlist))
+ elif section == DEFAULTSECT:
+ sectdict = self.__defaults
+ sectlist = None
+ else:
+ sectdict = {}
+ self.__sections_dict[section] = sectdict
+ sectlist = []
+ self.__sections_list.append((section, sectlist))
+ xform = self.optionxform(option)
+ sectdict[xform] = value
+ if sectlist is not None:
+ sectlist.append([xform, value])
+
+ def get(self, section, option, raw=0, vars=None):
+ d = self.__defaults.copy()
+ try:
+ d.update(self.__sections_dict[section])
+ except KeyError:
+ if section != DEFAULTSECT:
+ raise NoSectionError(section)
+ if vars:
+ d.update(vars)
+ option = self.optionxform(option)
+ try:
+ rawval = d[option]
+ except KeyError:
+ raise NoOptionError(option, section)
+ if raw:
+ return rawval
+ return self.__interpolate(rawval, d)
+
+ def getall(self, section, option, raw=0, vars=None):
+ option = self.optionxform(option)
+ values = []
+ d = self.__defaults.copy()
+ if section != DEFAULTSECT:
+ for sectname, options in self.__sections_list:
+ if sectname == section:
+ for optname, value in options:
+ if optname == option:
+ values.append(value)
+ d[optname] = value
+ if raw:
+ return values
+ if vars:
+ d.update(vars)
+ for i in len(values):
+ values[i] = self.__interpolate(values[i], d)
+ return values
+
+ def walk(self, section, option=None, raw=0, vars=None):
+ # Build dictionary for interpolation
+ try:
+ d = self.__sections_dict[section].copy()
+ except KeyError:
+ if section == DEFAULTSECT:
+ d = {}
+ else:
+ raise NoSectionError(section)
+ d.update(self.__defaults)
+ if vars:
+ d.update(vars)
+
+ # Start walking
+ if option:
+ option = self.optionxform(option)
+ if section != DEFAULTSECT:
+ for sectname, options in self.__sections_list:
+ if sectname == section:
+ for optname, value in options:
+ if not option or optname == option:
+ if not raw:
+ value = self.__interpolate(value, d)
+ yield (optname, value)
+
+ def __interpolate(self, value, vars):
+ rawval = value
+ depth = 0
+ while depth < 10:
+ depth = depth + 1
+ if value.find("%(") >= 0:
+ try:
+ value = value % vars
+ except KeyError, key:
+ raise InterpolationError(key, option, section, rawval)
+ else:
+ break
+ if value.find("%(") >= 0:
+ raise InterpolationDepthError(option, section, rawval)
+ return value
+
+ def __get(self, section, conv, option):
+ return conv(self.get(section, option))
+
+ def getint(self, section, option):
+ return self.__get(section, string.atoi, option)
+
+ def getfloat(self, section, option):
+ return self.__get(section, string.atof, option)
+
+ def getboolean(self, section, option):
+ states = {'1': 1, 'yes': 1, 'true': 1, 'on': 1,
+ '0': 0, 'no': 0, 'false': 0, 'off': 0}
+ v = self.get(section, option)
+ if not states.has_key(v.lower()):
+ raise ValueError, 'Not a boolean: %s' % v
+ return states[v.lower()]
+
+ def optionxform(self, optionstr):
+ #return optionstr.lower()
+ return optionstr
+
+ def has_option(self, section, option):
+ """Check for the existence of a given option in a given section."""
+ if not section or section == "DEFAULT":
+ return self.__defaults.has_key(option)
+ elif not self.has_section(section):
+ return 0
+ else:
+ option = self.optionxform(option)
+ return self.__sections_dict[section].has_key(option)
+
+ SECTCRE = re.compile(r'\[(?P<header>[^]]+)\]')
+ OPTCRE = re.compile(r'(?P<option>\S+)\s*(?P<vi>[:=])\s*(?P<value>.*)$')
+
+ def __read(self, fp, fpname):
+ cursectdict = None # None, or a dictionary
+ optname = None
+ lineno = 0
+ e = None # None, or an exception
+ while 1:
+ line = fp.readline()
+ if not line:
+ break
+ lineno = lineno + 1
+ # comment or blank line?
+ if line.strip() == '' or line[0] in '#;':
+ continue
+ if line.split()[0].lower() == 'rem' \
+ and line[0] in "rR": # no leading whitespace
+ continue
+ # continuation line?
+ if line[0] in ' \t' and cursectdict is not None and optname:
+ value = line.strip()
+ if value:
+ k = self.optionxform(optname)
+ cursectdict[k] = "%s\n%s" % (cursectdict[k], value)
+ cursectlist[-1][1] = "%s\n%s" % (cursectlist[-1][1], value)
+ # a section header or option header?
+ else:
+ # is it a section header?
+ mo = self.SECTCRE.match(line)
+ if mo:
+ sectname = mo.group('header')
+ if self.__sections_dict.has_key(sectname):
+ cursectdict = self.__sections_dict[sectname]
+ cursectlist = []
+ self.__sections_list.append((sectname, cursectlist))
+ elif sectname == DEFAULTSECT:
+ cursectdict = self.__defaults
+ cursectlist = None
+ else:
+ cursectdict = {}
+ self.__sections_dict[sectname] = cursectdict
+ cursectlist = []
+ self.__sections_list.append((sectname, cursectlist))
+ # So sections can't start with a continuation line
+ optname = None
+ # no section header in the file?
+ elif cursectdict is None:
+ raise MissingSectionHeaderError(fpname, lineno, `line`)
+ # an option line?
+ else:
+ mo = self.OPTCRE.match(line)
+ if mo:
+ optname, vi, optval = mo.group('option', 'vi', 'value')
+ if vi in ('=', ':') and ';' in optval:
+ # ';' is a comment delimiter only if it follows
+ # a spacing character
+ pos = optval.find(';')
+ if pos and optval[pos-1] in string.whitespace:
+ optval = optval[:pos]
+ optval = optval.strip()
+ # allow empty values
+ if optval == '""':
+ optval = ''
+ xform = self.optionxform(optname)
+ cursectdict[xform] = optval
+ if cursectlist is not None:
+ cursectlist.append([xform, optval])
+ else:
+ # a non-fatal parsing error occurred. set up the
+ # exception but keep going. the exception will be
+ # raised at the end of the file and will contain a
+ # list of all bogus lines
+ if not e:
+ e = ParsingError(fpname)
+ e.append(lineno, `line`)
+ # if any parsing errors occurred, raise an exception
+ if e:
+ raise e
+
+# Here we wrap this hacked ConfigParser into something more useful
+# for us.
+
+import os
+
+class Config:
+ def __init__(self):
+ self._config = ConfigParser()
+ self._wrapped = {}
+ conffiles = []
+ mgarepo_conf = os.environ.get("MGAREPO_CONF")
+ if mgarepo_conf:
+ conffiles.append(mgarepo_conf)
+ else:
+ conffiles.append("/etc/mgarepo.conf")
+ conffiles.append(os.path.expanduser("~/.mgarepo/config"))
+ for file in conffiles:
+ if os.path.isfile(file):
+ self._config.read(file)
+
+ def wrap(self, section, handler, option=None):
+ """Set one wrapper for a given section
+
+ The wrapper must be a function
+ f(section, option=None, default=None, walk=False).
+ """
+ self._wrapped[section] = handler
+
+ def sections(self):
+ try:
+ return self._config.sections()
+ except Error:
+ return []
+
+ def options(self, section):
+ try:
+ return self._config.options(section)
+ except Error:
+ return []
+
+ def set(self, section, option, value):
+ return self._config.set(section, option, value)
+
+ def walk(self, section, option=None, raw=0, vars=None):
+ handler = self._wrapped.get(section)
+ if handler:
+ return handler(section, option, walk=True)
+ return self._config.walk(section, option, raw, vars)
+
+ def get(self, section, option, default=None, raw=False, wrap=True):
+ if wrap:
+ handler = self._wrapped.get(section)
+ if handler:
+ handler = self._wrapped.get(section)
+ return handler(section, option, default)
+ try:
+ return self._config.get(section, option, raw=raw)
+ except Error:
+ return default
+
+ def getint(self, section, option, default=None):
+ ret = self.get(section, option, default)
+ if type(ret) == type(""):
+ return int(ret)
+
+ def getbool(self, section, option, default=None):
+ ret = self.get(section, option, default)
+ states = {'1': 1, 'yes': 1, 'true': 1, 'on': 1,
+ '0': 0, 'no': 0, 'false': 0, 'off': 0}
+ if type(ret) == type("") and states.has_key(ret.lower()):
+ return states[ret.lower()]
+ return default
+
+def test():
+ config = Config()
+ def handler(section, option=None, default=None, walk=False):
+ d = {"fulano": "ciclano",
+ "foolano": "ceeclano"}
+ if walk:
+ return d.items()
+ elif option in d:
+ return d[option]
+ else:
+ return config.get(section, option, default, wrap=False)
+ config.wrap("users", handler=handler)
+ print config.get("users", "fulano") # found in wrapper
+ print config.get("users", "andreas") # found in repsys.conf
+ print config.walk("users")
+
+if __name__ == "__main__":
+ test()
+# vim:ts=4:sw=4:et