Merged revisions 65686 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r65686 | antoine.pitrou | 2008-08-14 23:04:30 +0200 (jeu., 14 août 2008) | 3 lines Issue #3476: make BufferedReader and BufferedWriter thread-safe ........
This commit is contained in:
parent
74bbea7ed7
commit
8769576477
102
Lib/io.py
102
Lib/io.py
@ -61,6 +61,7 @@ import sys
|
||||
import codecs
|
||||
import _fileio
|
||||
import warnings
|
||||
import threading
|
||||
|
||||
# open() uses st_blksize whenever we can
|
||||
DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
|
||||
@ -895,6 +896,7 @@ class BufferedReader(_BufferedIOMixin):
|
||||
_BufferedIOMixin.__init__(self, raw)
|
||||
self.buffer_size = buffer_size
|
||||
self._reset_read_buf()
|
||||
self._read_lock = threading.Lock()
|
||||
|
||||
def _reset_read_buf(self):
|
||||
self._read_buf = b""
|
||||
@ -908,6 +910,10 @@ class BufferedReader(_BufferedIOMixin):
|
||||
mode. If n is negative, read until EOF or until read() would
|
||||
block.
|
||||
"""
|
||||
with self._read_lock:
|
||||
return self._read_unlocked(n)
|
||||
|
||||
def _read_unlocked(self, n=None):
|
||||
nodata_val = b""
|
||||
empty_values = (b"", None)
|
||||
buf = self._read_buf
|
||||
@ -960,6 +966,10 @@ class BufferedReader(_BufferedIOMixin):
|
||||
do at most one raw read to satisfy it. We never return more
|
||||
than self.buffer_size.
|
||||
"""
|
||||
with self._read_lock:
|
||||
return self._peek_unlocked(n)
|
||||
|
||||
def _peek_unlocked(self, n=0):
|
||||
want = min(n, self.buffer_size)
|
||||
have = len(self._read_buf) - self._read_pos
|
||||
if have < want:
|
||||
@ -976,18 +986,21 @@ class BufferedReader(_BufferedIOMixin):
|
||||
# only return buffered bytes. Otherwise, we do one raw read.
|
||||
if n <= 0:
|
||||
return b""
|
||||
self.peek(1)
|
||||
return self.read(min(n, len(self._read_buf) - self._read_pos))
|
||||
with self._read_lock:
|
||||
self._peek_unlocked(1)
|
||||
return self._read_unlocked(
|
||||
min(n, len(self._read_buf) - self._read_pos))
|
||||
|
||||
def tell(self):
|
||||
return self.raw.tell() - len(self._read_buf) + self._read_pos
|
||||
|
||||
def seek(self, pos, whence=0):
|
||||
if whence == 1:
|
||||
pos -= len(self._read_buf) - self._read_pos
|
||||
pos = self.raw.seek(pos, whence)
|
||||
self._reset_read_buf()
|
||||
return pos
|
||||
with self._read_lock:
|
||||
if whence == 1:
|
||||
pos -= len(self._read_buf) - self._read_pos
|
||||
pos = self.raw.seek(pos, whence)
|
||||
self._reset_read_buf()
|
||||
return pos
|
||||
|
||||
|
||||
class BufferedWriter(_BufferedIOMixin):
|
||||
@ -1009,43 +1022,51 @@ class BufferedWriter(_BufferedIOMixin):
|
||||
if max_buffer_size is None
|
||||
else max_buffer_size)
|
||||
self._write_buf = bytearray()
|
||||
self._write_lock = threading.Lock()
|
||||
|
||||
def write(self, b):
|
||||
if self.closed:
|
||||
raise ValueError("write to closed file")
|
||||
if isinstance(b, str):
|
||||
raise TypeError("can't write str to binary stream")
|
||||
# XXX we can implement some more tricks to try and avoid partial writes
|
||||
if len(self._write_buf) > self.buffer_size:
|
||||
# We're full, so let's pre-flush the buffer
|
||||
try:
|
||||
self.flush()
|
||||
except BlockingIOError as e:
|
||||
# We can't accept anything else.
|
||||
# XXX Why not just let the exception pass through?
|
||||
raise BlockingIOError(e.errno, e.strerror, 0)
|
||||
before = len(self._write_buf)
|
||||
self._write_buf.extend(b)
|
||||
written = len(self._write_buf) - before
|
||||
if len(self._write_buf) > self.buffer_size:
|
||||
try:
|
||||
self.flush()
|
||||
except BlockingIOError as e:
|
||||
if (len(self._write_buf) > self.max_buffer_size):
|
||||
# We've hit max_buffer_size. We have to accept a partial
|
||||
# write and cut back our buffer.
|
||||
overage = len(self._write_buf) - self.max_buffer_size
|
||||
self._write_buf = self._write_buf[:self.max_buffer_size]
|
||||
raise BlockingIOError(e.errno, e.strerror, overage)
|
||||
return written
|
||||
with self._write_lock:
|
||||
# XXX we can implement some more tricks to try and avoid
|
||||
# partial writes
|
||||
if len(self._write_buf) > self.buffer_size:
|
||||
# We're full, so let's pre-flush the buffer
|
||||
try:
|
||||
self._flush_unlocked()
|
||||
except BlockingIOError as e:
|
||||
# We can't accept anything else.
|
||||
# XXX Why not just let the exception pass through?
|
||||
raise BlockingIOError(e.errno, e.strerror, 0)
|
||||
before = len(self._write_buf)
|
||||
self._write_buf.extend(b)
|
||||
written = len(self._write_buf) - before
|
||||
if len(self._write_buf) > self.buffer_size:
|
||||
try:
|
||||
self._flush_unlocked()
|
||||
except BlockingIOError as e:
|
||||
if len(self._write_buf) > self.max_buffer_size:
|
||||
# We've hit max_buffer_size. We have to accept a
|
||||
# partial write and cut back our buffer.
|
||||
overage = len(self._write_buf) - self.max_buffer_size
|
||||
self._write_buf = self._write_buf[:self.max_buffer_size]
|
||||
raise BlockingIOError(e.errno, e.strerror, overage)
|
||||
return written
|
||||
|
||||
def truncate(self, pos=None):
|
||||
self.flush()
|
||||
if pos is None:
|
||||
pos = self.raw.tell()
|
||||
return self.raw.truncate(pos)
|
||||
with self._write_lock:
|
||||
self._flush_unlocked()
|
||||
if pos is None:
|
||||
pos = self.raw.tell()
|
||||
return self.raw.truncate(pos)
|
||||
|
||||
def flush(self):
|
||||
with self._write_lock:
|
||||
self._flush_unlocked()
|
||||
|
||||
def _flush_unlocked(self):
|
||||
if self.closed:
|
||||
raise ValueError("flush of closed file")
|
||||
written = 0
|
||||
@ -1064,8 +1085,9 @@ class BufferedWriter(_BufferedIOMixin):
|
||||
return self.raw.tell() + len(self._write_buf)
|
||||
|
||||
def seek(self, pos, whence=0):
|
||||
self.flush()
|
||||
return self.raw.seek(pos, whence)
|
||||
with self._write_lock:
|
||||
self._flush_unlocked()
|
||||
return self.raw.seek(pos, whence)
|
||||
|
||||
|
||||
class BufferedRWPair(BufferedIOBase):
|
||||
@ -1155,7 +1177,8 @@ class BufferedRandom(BufferedWriter, BufferedReader):
|
||||
# First do the raw seek, then empty the read buffer, so that
|
||||
# if the raw seek fails, we don't lose buffered data forever.
|
||||
pos = self.raw.seek(pos, whence)
|
||||
self._reset_read_buf()
|
||||
with self._read_lock:
|
||||
self._reset_read_buf()
|
||||
return pos
|
||||
|
||||
def tell(self):
|
||||
@ -1192,8 +1215,9 @@ class BufferedRandom(BufferedWriter, BufferedReader):
|
||||
def write(self, b):
|
||||
if self._read_buf:
|
||||
# Undo readahead
|
||||
self.raw.seek(self._read_pos - len(self._read_buf), 1)
|
||||
self._reset_read_buf()
|
||||
with self._read_lock:
|
||||
self.raw.seek(self._read_pos - len(self._read_buf), 1)
|
||||
self._reset_read_buf()
|
||||
return BufferedWriter.write(self, b)
|
||||
|
||||
|
||||
|
@ -3,11 +3,15 @@
|
||||
# See test_cmd_line_script.py for testing of script execution
|
||||
|
||||
import test.support, unittest
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
def _spawn_python(*args):
|
||||
cmd_line = [sys.executable, '-E']
|
||||
cmd_line = [sys.executable]
|
||||
# When testing -S, we need PYTHONPATH to work (see test_site_flag())
|
||||
if '-S' not in args:
|
||||
cmd_line.append('-E')
|
||||
cmd_line.extend(args)
|
||||
return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
@ -59,6 +63,16 @@ class CmdLineTest(unittest.TestCase):
|
||||
self.verify_valid_flag('-Qwarnall')
|
||||
|
||||
def test_site_flag(self):
|
||||
if os.name == 'posix':
|
||||
# Workaround bug #586680 by adding the extension dir to PYTHONPATH
|
||||
from distutils.util import get_platform
|
||||
s = "./build/lib.%s-%.3s" % (get_platform(), sys.version)
|
||||
if hasattr(sys, 'gettotalrefcount'):
|
||||
s += '-pydebug'
|
||||
p = os.environ.get('PYTHONPATH', '')
|
||||
if p:
|
||||
p += ':'
|
||||
os.environ['PYTHONPATH'] = p + s
|
||||
self.verify_valid_flag('-S')
|
||||
|
||||
def test_usage(self):
|
||||
|
@ -4,8 +4,10 @@ import os
|
||||
import sys
|
||||
import time
|
||||
import array
|
||||
import threading
|
||||
import random
|
||||
import unittest
|
||||
from itertools import chain
|
||||
from itertools import chain, cycle
|
||||
from test import support
|
||||
|
||||
import codecs
|
||||
@ -390,6 +392,49 @@ class BufferedReaderTest(unittest.TestCase):
|
||||
# this test. Else, write it.
|
||||
pass
|
||||
|
||||
def testThreads(self):
|
||||
try:
|
||||
# Write out many bytes with exactly the same number of 0's,
|
||||
# 1's... 255's. This will help us check that concurrent reading
|
||||
# doesn't duplicate or forget contents.
|
||||
N = 1000
|
||||
l = list(range(256)) * N
|
||||
random.shuffle(l)
|
||||
s = bytes(bytearray(l))
|
||||
with io.open(support.TESTFN, "wb") as f:
|
||||
f.write(s)
|
||||
with io.open(support.TESTFN, "rb", buffering=0) as raw:
|
||||
bufio = io.BufferedReader(raw, 8)
|
||||
errors = []
|
||||
results = []
|
||||
def f():
|
||||
try:
|
||||
# Intra-buffer read then buffer-flushing read
|
||||
for n in cycle([1, 19]):
|
||||
s = bufio.read(n)
|
||||
if not s:
|
||||
break
|
||||
# list.append() is atomic
|
||||
results.append(s)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
raise
|
||||
threads = [threading.Thread(target=f) for x in range(20)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
time.sleep(0.02) # yield
|
||||
for t in threads:
|
||||
t.join()
|
||||
self.assertFalse(errors,
|
||||
"the following exceptions were caught: %r" % errors)
|
||||
s = b''.join(results)
|
||||
for i in range(256):
|
||||
c = bytes(bytearray([i]))
|
||||
self.assertEqual(s.count(c), N)
|
||||
finally:
|
||||
support.unlink(support.TESTFN)
|
||||
|
||||
|
||||
|
||||
class BufferedWriterTest(unittest.TestCase):
|
||||
|
||||
@ -446,6 +491,38 @@ class BufferedWriterTest(unittest.TestCase):
|
||||
|
||||
self.assertEquals(b"abc", writer._write_stack[0])
|
||||
|
||||
def testThreads(self):
|
||||
# BufferedWriter should not raise exceptions or crash
|
||||
# when called from multiple threads.
|
||||
try:
|
||||
# We use a real file object because it allows us to
|
||||
# exercise situations where the GIL is released before
|
||||
# writing the buffer to the raw streams. This is in addition
|
||||
# to concurrency issues due to switching threads in the middle
|
||||
# of Python code.
|
||||
with io.open(support.TESTFN, "wb", buffering=0) as raw:
|
||||
bufio = io.BufferedWriter(raw, 8)
|
||||
errors = []
|
||||
def f():
|
||||
try:
|
||||
# Write enough bytes to flush the buffer
|
||||
s = b"a" * 19
|
||||
for i in range(50):
|
||||
bufio.write(s)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
raise
|
||||
threads = [threading.Thread(target=f) for x in range(20)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
time.sleep(0.02) # yield
|
||||
for t in threads:
|
||||
t.join()
|
||||
self.assertFalse(errors,
|
||||
"the following exceptions were caught: %r" % errors)
|
||||
finally:
|
||||
support.unlink(support.TESTFN)
|
||||
|
||||
|
||||
class BufferedRWPairTest(unittest.TestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user