New I/O code from Tony Lownds implement newline feature correctly,

and implements .newlines attribute in a 2.x-compatible fashion.
This commit is contained in:
Guido van Rossum 2007-08-18 21:39:55 +00:00
parent b99f762f10
commit 8358db22fa
3 changed files with 246 additions and 71 deletions

206
Lib/io.py
View File

@ -61,10 +61,26 @@ def open(file, mode="r", buffering=None, encoding=None, newline=None):
can be: 0 = unbuffered, 1 = line buffered,
larger = fully buffered.
encoding: optional string giving the text encoding.
newline: optional newlines specifier; must be None, '\n' or '\r\n';
specifies the line ending expected on input and written on
output. If None, use universal newlines on input and
use os.linesep on output.
newline: optional newlines specifier; must be None, '', '\n', '\r'
or '\r\n'; all other values are illegal. It controls the
handling of line endings. It works as follows:
* On input, if `newline` is `None`, universal newlines
mode is enabled. Lines in the input can end in `'\n'`,
`'\r'`, or `'\r\n'`, and these are translated into
`'\n'` before being returned to the caller. If it is
`''`, universal newline mode is enabled, but line endings
are returned to the caller untranslated. If it has any of
the other legal values, input lines are only terminated by
the given string, and the line ending is returned to the
caller untranslated.
* On output, if `newline` is `None`, any `'\n'`
characters written are translated to the system default
line separator, `os.linesep`. If `newline` is `''`,
no translation takes place. If `newline` is any of the
other legal values, any `'\n'` characters written are
translated to the given string.
(*) If a file descriptor is given, it is closed when the returned
I/O object is closed. If you don't want this to happen, use
@ -958,6 +974,17 @@ class TextIOBase(IOBase):
"""Subclasses should override."""
return None
@property
def newlines(self):
"""newlines -> None | str | tuple of str. Line endings translated
so far.
Only line endings translated during reading are considered.
Subclasses should override.
"""
return None
class TextIOWrapper(TextIOBase):
@ -969,7 +996,7 @@ class TextIOWrapper(TextIOBase):
_CHUNK_SIZE = 128
def __init__(self, buffer, encoding=None, newline=None):
if newline not in (None, "\n", "\r\n"):
if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,))
if encoding is None:
try:
@ -987,8 +1014,12 @@ class TextIOWrapper(TextIOBase):
self.buffer = buffer
self._encoding = encoding
self._newline = newline or os.linesep
self._fix_newlines = newline is None
self._readuniversal = not newline
self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
self._seennl = 0
self._decoder = None
self._pending = ""
self._snapshot = None
@ -1032,13 +1063,15 @@ class TextIOWrapper(TextIOBase):
def write(self, s: str):
if self.closed:
raise ValueError("write to closed file")
haslf = "\n" in s
if haslf and self._writetranslate and self._writenl != "\n":
s = s.replace("\n", self._writenl)
# XXX What if we were just reading?
b = s.encode(self._encoding)
if isinstance(b, str):
b = bytes(b)
n = self.buffer.write(b)
if "\n" in s:
# XXX only if isatty
self.buffer.write(b)
if haslf and self.isatty():
self.flush()
self._snapshot = self._decoder = None
return len(s)
@ -1159,7 +1192,7 @@ class TextIOWrapper(TextIOBase):
res += decoder.decode(self.buffer.read(), True)
self._pending = ""
self._snapshot = None
return res.replace("\r\n", "\n")
return self._replacenl(res)
else:
while len(res) < n:
readahead, pending = self._read_chunk()
@ -1167,7 +1200,7 @@ class TextIOWrapper(TextIOBase):
if not readahead:
break
self._pending = res[n:]
return res[:n].replace("\r\n", "\n")
return self._replacenl(res[:n])
def __next__(self):
self._telling = False
@ -1189,59 +1222,136 @@ class TextIOWrapper(TextIOBase):
line = self._pending
start = 0
cr_eof = False
decoder = self._decoder or self._get_decoder()
pos = endpos = None
ending = None
while True:
# In C we'd look for these in parallel of course.
nlpos = line.find("\n", start)
crpos = line.find("\r", start)
if nlpos >= 0 and crpos >= 0:
endpos = min(nlpos, crpos)
else:
endpos = nlpos if nlpos >= 0 else crpos
if self._readuniversal:
# Universal newline search. Find any of \r, \r\n, \n
if endpos != -1:
endc = line[endpos]
if endc == "\n":
ending = "\n"
break
# We've seen \r - is it standalone, \r\n or \r at end of line?
if endpos + 1 < len(line):
if line[endpos+1] == "\n":
ending = "\r\n"
# In C we'd look for these in parallel of course.
nlpos = line.find("\n", start)
crpos = line.find("\r", start)
if crpos == -1:
if nlpos == -1:
start = len(line)
else:
ending = "\r"
# Found \n
pos = nlpos
endpos = pos + 1
ending = self._LF
break
elif nlpos == -1:
if crpos == len(line) - 1:
# Found \r at end of buffer, must keep reading
start = crpos
cr_eof = True
else:
# Found lone \r
ending = self._CR
pos = crpos
endpos = pos + 1
break
elif nlpos < crpos:
# Found \n
pos = nlpos
endpos = pos + 1
ending = self._LF
break
elif nlpos == crpos + 1:
# Found \r\n
ending = self._CRLF
pos = crpos
endpos = pos + 2
break
else:
# Found \r
pos = crpos
endpos = pos + 1
ending = self._CR
break
# There might be a following \n in the next block of data ...
start = endpos
else:
start = len(line)
# non-universal
pos = line.find(self._readnl)
if pos >= 0:
endpos = pos+len(self._readnl)
ending = self._nlflag(self._readnl)
break
# No line ending seen yet - get more data
more_line = ''
while True:
readahead, pending = self._read_chunk()
more_line = pending
if more_line or not readahead:
break
if more_line:
line += more_line
else:
# end of file
self._pending = ''
self._snapshot = None
if cr_eof:
self._seennl |= self._CR
return line[:-1] + '\n'
else:
return line
if not more_line:
ending = ""
endpos = len(line)
break
line += more_line
nextpos = endpos + len(ending)
self._pending = line[nextpos:]
# XXX Update self.newlines here if we want to support that
if self._fix_newlines and ending not in ("\n", ""):
return line[:endpos] + "\n"
self._pending = line[endpos:]
if self._readtranslate:
self._seennl |= ending
if ending != self._LF:
return line[:pos] + '\n'
else:
return line[:endpos]
else:
return line[:nextpos]
return line[:endpos]
def _replacenl(self, data):
# Replace newlines in data as needed and record that they have
# been seen.
if not self._readtranslate:
return data
if self._readuniversal:
crlf = data.count('\r\n')
cr = data.count('\r') - crlf
lf = data.count('\n') - crlf
self._seennl |= (lf and self._LF) | (cr and self._CR) \
| (crlf and self._CRLF)
if crlf:
data = data.replace("\r\n", "\n")
if cr:
data = data.replace("\r", "\n")
elif self._readnl == '\n':
# Only need to detect if \n was seen.
if data.count('\n'):
self._seennl |= self._LF
else:
newdata = data.replace(self._readnl, '\n')
if newdata is not data:
self._seennl |= self._nlflag(self._readnl)
data = newdata
return data
_LF = 1
_CR = 2
_CRLF = 4
@property
def newlines(self):
return (None,
"\n",
"\r",
("\r", "\n"),
"\r\n",
("\n", "\r\n"),
("\r", "\r\n"),
("\r", "\n", "\r\n")
)[self._seennl]
def _nlflag(self, nlstr):
return [None, "\n", "\r", None, "\r\n"].index(nlstr)
class StringIO(TextIOWrapper):

View File

@ -1,5 +1,6 @@
"""Unit tests for io.py."""
import os
import sys
import time
import array
@ -481,30 +482,61 @@ class TextIOWrapperTest(unittest.TestCase):
def tearDown(self):
test_support.unlink(test_support.TESTFN)
def testNewlinesInput(self):
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
(None, normalized.decode("ASCII").splitlines(True)),
("", testdata.decode("ASCII").splitlines(True)),
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
self.assertEquals(txt.readlines(), expected)
txt.seek(0)
self.assertEquals(txt.read(), "".join(expected))
def testNewlinesOutput(self):
testdict = {
"": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
"\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
"\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
"\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
}
tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
for newline, expected in tests:
buf = io.BytesIO()
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
txt.write("AAA\nB")
txt.write("BB\nCCC\n")
txt.write("X\rY\r\nZ")
txt.flush()
self.assertEquals(buf.getvalue(), expected)
def testNewlines(self):
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
tests = [
[ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
[ '\n', input_lines ],
[ '\r\n', input_lines ],
[ '', input_lines ],
[ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
[ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
[ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
]
encodings = ('utf-8', 'latin-1')
# Try a range of pad sizes to test the case where \r is the last
# Try a range of buffer sizes to test the case where \r is the last
# character in TextIOWrapper._pending_line.
for encoding in encodings:
# XXX: str.encode() should return bytes
data = bytes(''.join(input_lines).encode(encoding))
for do_reads in (False, True):
for padlen in chain(range(10), range(50, 60)):
pad = '.' * padlen
data_lines = [ pad + line for line in input_lines ]
# XXX: str.encode() should return bytes
data = bytes(''.join(data_lines).encode(encoding))
for newline, exp_line_ends in tests:
exp_lines = [ pad + line for line in exp_line_ends ]
bufio = io.BufferedReader(io.BytesIO(data))
for bufsize in range(1, 10):
for newline, exp_lines in tests:
bufio = io.BufferedReader(io.BytesIO(data), bufsize)
textio = io.TextIOWrapper(bufio, newline=newline,
encoding=encoding)
if do_reads:
@ -522,6 +554,47 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEquals(got_line, exp_line)
self.assertEquals(len(got_lines), len(exp_lines))
def testNewlinesInput(self):
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
(None, normalized.decode("ASCII").splitlines(True)),
("", testdata.decode("ASCII").splitlines(True)),
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
self.assertEquals(txt.readlines(), expected)
txt.seek(0)
self.assertEquals(txt.read(), "".join(expected))
def testNewlinesOutput(self):
import os
orig_linesep = os.linesep
data = "AAA\nBBB\rCCC\n"
data_lf = b"AAA\nBBB\rCCC\n"
data_cr = b"AAA\rBBB\rCCC\r"
data_crlf = b"AAA\r\nBBB\rCCC\r\n"
for os.linesep, newline, expected in [
("\n", None, data_lf),
("\r\n", None, data_crlf),
("\n", "", data_lf),
("\r\n", "", data_lf),
("\n", "\n", data_lf),
("\r\n", "\n", data_lf),
("\n", "\r", data_cr),
("\r\n", "\r", data_cr),
("\n", "\r\n", data_crlf),
("\r\n", "\r\n", data_crlf),
]:
buf = io.BytesIO()
txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
txt.write(data)
txt.close()
self.assertEquals(buf.getvalue(), expected)
# Systematic tests of the text I/O API
def testBasicIO(self):

View File

@ -12,9 +12,8 @@ FATX = 'x' * (2**14)
DATA_TEMPLATE = [
"line1=1",
"line2='this is a very long line designed to go past the magic " +
"hundred character limit that is inside fileobject.c and which " +
"is meant to speed up the common case, but we also want to test " +
"line2='this is a very long line designed to go past any default " +
"buffer limits that exist in io.py but we also want to test " +
"the uncommon case, naturally.'",
"def line3():pass",
"line4 = '%s'" % FATX,
@ -32,7 +31,7 @@ DATA_SPLIT = [x + "\n" for x in DATA_TEMPLATE]
class TestGenericUnivNewlines(unittest.TestCase):
# use a class variable DATA to define the data to write to the file
# and a class variable NEWLINE to set the expected newlines value
READMODE = 'U'
READMODE = 'r'
WRITEMODE = 'wb'
def setUp(self):
@ -79,12 +78,6 @@ class TestGenericUnivNewlines(unittest.TestCase):
self.assertEqual(data, DATA_SPLIT[1:])
class TestNativeNewlines(TestGenericUnivNewlines):
NEWLINE = None
DATA = DATA_LF
READMODE = 'r'
WRITEMODE = 'w'
class TestCRNewlines(TestGenericUnivNewlines):
NEWLINE = '\r'
DATA = DATA_CR
@ -104,7 +97,6 @@ class TestMixedNewlines(TestGenericUnivNewlines):
def test_main():
test_support.run_unittest(
TestNativeNewlines,
TestCRNewlines,
TestLFNewlines,
TestCRLFNewlines,