gh-117348: Refactored RawConfigParser._read for similicity and comprehensibility (#117372)

* Extract method for _read_inner, reducing complexity and indentation by 1.

* Extract method for _raise_all and yield ParseErrors from _read_inner.

Reduces complexity by 1 and reduces touch points for handling errors in _read_inner.

* Prefer iterators to splat expansion and literal indexing.

* Extract method for _strip_comments. Reduces complexity by 7.

* Model the file lines in a class to encapsulate the comment status and cleaned value.

* Encapsulate the read state as a dataclass

* Extract _handle_continuation_line and _handle_rest methods. Reduces complexity by 8.

* Reindent

* At least for now, collect errors in the ReadState

* Check for missing section header separately.

* Extract methods for _handle_header and _handle_option. Reduces complexity by 6.

* Remove unreachable code. Reduces complexity by 4.

* Remove unreachable branch

* Handle error condition early. Reduces complexity by 1.

* Add blurb

* Move _raise_all to ParsingError, as its behavior is most closely related to the exception class and not the reader.

* Split _strip* into separate methods.

* Refactor _strip_full to compute the strip just once and use 'not any' to determine the factor.

* Replace use of 'sys.maxsize' with direct computation of the stripped value.

* Extract has_comments as a dynamic property.

* Implement clean as a cached property.

* Model comment prefixes in the RawConfigParser within a prefixes namespace.

* Use a regular expression to search for the first match.

Avoids mutating variables and tricky logic and over-computing all of the starts when only the first is relevant.
This commit is contained in:
Jason R. Coombs 2024-03-29 16:06:09 -04:00 committed by GitHub
parent 01bd74eadb
commit 019143fecb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 185 additions and 147 deletions

View File

@ -145,12 +145,15 @@ ConfigParser -- responsible for parsing a list of
from collections.abc import MutableMapping
from collections import ChainMap as _ChainMap
import contextlib
from dataclasses import dataclass, field
import functools
import io
import itertools
import os
import re
import sys
from typing import Iterable
__all__ = ("NoSectionError", "DuplicateOptionError", "DuplicateSectionError",
"NoOptionError", "InterpolationError", "InterpolationDepthError",
@ -302,15 +305,33 @@ class InterpolationDepthError(InterpolationError):
class ParsingError(Error):
"""Raised when a configuration file does not follow legal syntax."""
def __init__(self, source):
def __init__(self, source, *args):
super().__init__(f'Source contains parsing errors: {source!r}')
self.source = source
self.errors = []
self.args = (source, )
if args:
self.append(*args)
def append(self, lineno, line):
self.errors.append((lineno, line))
self.message += '\n\t[line %2d]: %s' % (lineno, line)
self.message += '\n\t[line %2d]: %s' % (lineno, repr(line))
def combine(self, others):
for other in others:
for error in other.errors:
self.append(*error)
return self
@staticmethod
def _raise_all(exceptions: Iterable['ParsingError']):
"""
Combine any number of ParsingErrors into one and raise it.
"""
exceptions = iter(exceptions)
with contextlib.suppress(StopIteration):
raise next(exceptions).combine(exceptions)
class MissingSectionHeaderError(ParsingError):
@ -517,6 +538,55 @@ class ExtendedInterpolation(Interpolation):
"found: %r" % (rest,))
@dataclass
class _ReadState:
elements_added : set[str] = field(default_factory=set)
cursect : dict[str, str] | None = None
sectname : str | None = None
optname : str | None = None
lineno : int = 0
indent_level : int = 0
errors : list[ParsingError] = field(default_factory=list)
@dataclass
class _Prefixes:
full : Iterable[str]
inline : Iterable[str]
class _Line(str):
def __new__(cls, val, *args, **kwargs):
return super().__new__(cls, val)
def __init__(self, val, prefixes: _Prefixes):
self.prefixes = prefixes
@functools.cached_property
def clean(self):
return self._strip_full() and self._strip_inline()
@property
def has_comments(self):
return self.strip() != self.clean
def _strip_inline(self):
"""
Search for the earliest prefix at the beginning of the line or following a space.
"""
matcher = re.compile(
'|'.join(fr'(^|\s)({re.escape(prefix)})' for prefix in self.prefixes.inline)
# match nothing if no prefixes
or '(?!)'
)
match = matcher.search(self)
return self[:match.start() if match else None].strip()
def _strip_full(self):
return '' if any(map(self.strip().startswith, self.prefixes.full)) else True
class RawConfigParser(MutableMapping):
"""ConfigParser that does not do interpolation."""
@ -583,8 +653,10 @@ class RawConfigParser(MutableMapping):
else:
self._optcre = re.compile(self._OPT_TMPL.format(delim=d),
re.VERBOSE)
self._comment_prefixes = tuple(comment_prefixes or ())
self._inline_comment_prefixes = tuple(inline_comment_prefixes or ())
self._prefixes = _Prefixes(
full=tuple(comment_prefixes or ()),
inline=tuple(inline_comment_prefixes or ()),
)
self._strict = strict
self._allow_no_value = allow_no_value
self._empty_lines_in_values = empty_lines_in_values
@ -975,147 +1047,117 @@ class RawConfigParser(MutableMapping):
in an otherwise empty line or may be entered in lines holding values or
section names. Please note that comments get stripped off when reading configuration files.
"""
elements_added = set()
cursect = None # None, or a dictionary
sectname = None
optname = None
lineno = 0
indent_level = 0
e = None # None, or an exception
try:
for lineno, line in enumerate(fp, start=1):
comment_start = sys.maxsize
# strip inline comments
inline_prefixes = {p: -1 for p in self._inline_comment_prefixes}
while comment_start == sys.maxsize and inline_prefixes:
next_prefixes = {}
for prefix, index in inline_prefixes.items():
index = line.find(prefix, index+1)
if index == -1:
continue
next_prefixes[prefix] = index
if index == 0 or (index > 0 and line[index-1].isspace()):
comment_start = min(comment_start, index)
inline_prefixes = next_prefixes
# strip full line comments
for prefix in self._comment_prefixes:
if line.strip().startswith(prefix):
comment_start = 0
break
if comment_start == sys.maxsize:
comment_start = None
value = line[:comment_start].strip()
if not value:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (comment_start is None and
cursect is not None and
optname and
cursect[optname] is not None):
cursect[optname].append('') # newlines added at join
else:
# empty line marks end of value
indent_level = sys.maxsize
continue
# continuation line?
first_nonspace = self.NONSPACECRE.search(line)
cur_indent_level = first_nonspace.start() if first_nonspace else 0
if (cursect is not None and optname and
cur_indent_level > indent_level):
if cursect[optname] is None:
raise MultilineContinuationError(fpname, lineno, line)
cursect[optname].append(value)
# a section header or option header?
else:
if self._allow_unnamed_section and cursect is None:
sectname = UNNAMED_SECTION
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self._optcre.match(value)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if not optname:
e = self._handle_error(e, fpname, lineno, line)
optname = self.optionxform(optname.rstrip())
if (self._strict and
(sectname, optname) in elements_added):
raise DuplicateOptionError(sectname, optname,
fpname, lineno)
elements_added.add((sectname, optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
cursect[optname] = [optval]
else:
# valueless option handling
cursect[optname] = None
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
e = self._handle_error(e, fpname, lineno, line)
ParsingError._raise_all(self._read_inner(fp, fpname))
finally:
self._join_multiline_values()
# if any parsing errors occurred, raise an exception
if e:
raise e
def _read_inner(self, fp, fpname):
st = _ReadState()
Line = functools.partial(_Line, prefixes=self._prefixes)
for st.lineno, line in enumerate(map(Line, fp), start=1):
if not line.clean:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (not line.has_comments and
st.cursect is not None and
st.optname and
st.cursect[st.optname] is not None):
st.cursect[st.optname].append('') # newlines added at join
else:
# empty line marks end of value
st.indent_level = sys.maxsize
continue
first_nonspace = self.NONSPACECRE.search(line)
st.cur_indent_level = first_nonspace.start() if first_nonspace else 0
if self._handle_continuation_line(st, line, fpname):
continue
self._handle_rest(st, line, fpname)
return st.errors
def _handle_continuation_line(self, st, line, fpname):
# continuation line?
is_continue = (st.cursect is not None and st.optname and
st.cur_indent_level > st.indent_level)
if is_continue:
if st.cursect[st.optname] is None:
raise MultilineContinuationError(fpname, st.lineno, line)
st.cursect[st.optname].append(line.clean)
return is_continue
def _handle_rest(self, st, line, fpname):
# a section header or option header?
if self._allow_unnamed_section and st.cursect is None:
st.sectname = UNNAMED_SECTION
st.cursect = self._dict()
self._sections[st.sectname] = st.cursect
self._proxies[st.sectname] = SectionProxy(self, st.sectname)
st.elements_added.add(st.sectname)
st.indent_level = st.cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(line.clean)
if not mo and st.cursect is None:
raise MissingSectionHeaderError(fpname, st.lineno, line)
self._handle_header(st, mo, fpname) if mo else self._handle_option(st, line, fpname)
def _handle_header(self, st, mo, fpname):
st.sectname = mo.group('header')
if st.sectname in self._sections:
if self._strict and st.sectname in st.elements_added:
raise DuplicateSectionError(st.sectname, fpname,
st.lineno)
st.cursect = self._sections[st.sectname]
st.elements_added.add(st.sectname)
elif st.sectname == self.default_section:
st.cursect = self._defaults
else:
st.cursect = self._dict()
self._sections[st.sectname] = st.cursect
self._proxies[st.sectname] = SectionProxy(self, st.sectname)
st.elements_added.add(st.sectname)
# So sections can't start with a continuation line
st.optname = None
def _handle_option(self, st, line, fpname):
# an option line?
st.indent_level = st.cur_indent_level
mo = self._optcre.match(line.clean)
if not mo:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
st.errors.append(ParsingError(fpname, st.lineno, line))
return
st.optname, vi, optval = mo.group('option', 'vi', 'value')
if not st.optname:
st.errors.append(ParsingError(fpname, st.lineno, line))
st.optname = self.optionxform(st.optname.rstrip())
if (self._strict and
(st.sectname, st.optname) in st.elements_added):
raise DuplicateOptionError(st.sectname, st.optname,
fpname, st.lineno)
st.elements_added.add((st.sectname, st.optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
st.cursect[st.optname] = [optval]
else:
# valueless option handling
st.cursect[st.optname] = None
def _join_multiline_values(self):
defaults = self.default_section, self._defaults
@ -1135,12 +1177,6 @@ class RawConfigParser(MutableMapping):
for key, value in defaults.items():
self._defaults[self.optionxform(key)] = value
def _handle_error(self, exc, fpname, lineno, line):
if not exc:
exc = ParsingError(fpname)
exc.append(lineno, repr(line))
return exc
def _unify_values(self, section, vars):
"""Create a sequence of lookups with 'vars' taking priority over
the 'section' which takes priority over the DEFAULTSECT.

View File

@ -0,0 +1,2 @@
Refactored :meth:`configparser.RawConfigParser._read` to reduce cyclometric
complexity and improve comprehensibility.