bpo-36402: Fix threading._shutdown() race condition (GH-13948)
Fix a race condition at Python shutdown when waiting for threads. Wait until the Python thread state of all non-daemon threads get deleted (join all non-daemon threads), rather than just wait until Python threads complete. * Add threading._shutdown_locks: set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() to wait until all Python thread states get deleted. See Thread._set_tstate_lock(). * Add also threading._shutdown_locks_lock to protect access to threading._shutdown_locks. * Add test_finalization_shutdown() test.
This commit is contained in:
parent
b4c7defe58
commit
468e5fec8a
@ -583,6 +583,41 @@ class ThreadTests(BaseTestCase):
|
|||||||
self.assertEqual(data.splitlines(),
|
self.assertEqual(data.splitlines(),
|
||||||
["GC: True True True"] * 2)
|
["GC: True True True"] * 2)
|
||||||
|
|
||||||
|
def test_finalization_shutdown(self):
|
||||||
|
# bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
|
||||||
|
# until Python thread states of all non-daemon threads get deleted.
|
||||||
|
#
|
||||||
|
# Test similar to SubinterpThreadingTests.test_threads_join_2(), but
|
||||||
|
# test the finalization of the main interpreter.
|
||||||
|
code = """if 1:
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
def random_sleep():
|
||||||
|
seconds = random.random() * 0.010
|
||||||
|
time.sleep(seconds)
|
||||||
|
|
||||||
|
class Sleeper:
|
||||||
|
def __del__(self):
|
||||||
|
random_sleep()
|
||||||
|
|
||||||
|
tls = threading.local()
|
||||||
|
|
||||||
|
def f():
|
||||||
|
# Sleep a bit so that the thread is still running when
|
||||||
|
# Py_Finalize() is called.
|
||||||
|
random_sleep()
|
||||||
|
tls.x = Sleeper()
|
||||||
|
random_sleep()
|
||||||
|
|
||||||
|
threading.Thread(target=f).start()
|
||||||
|
random_sleep()
|
||||||
|
"""
|
||||||
|
rc, out, err = assert_python_ok("-c", code)
|
||||||
|
self.assertEqual(err, b"")
|
||||||
|
|
||||||
def test_tstate_lock(self):
|
def test_tstate_lock(self):
|
||||||
# Test an implementation detail of Thread objects.
|
# Test an implementation detail of Thread objects.
|
||||||
started = _thread.allocate_lock()
|
started = _thread.allocate_lock()
|
||||||
@ -878,15 +913,22 @@ class SubinterpThreadingTests(BaseTestCase):
|
|||||||
self.addCleanup(os.close, w)
|
self.addCleanup(os.close, w)
|
||||||
code = r"""if 1:
|
code = r"""if 1:
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
def random_sleep():
|
||||||
|
seconds = random.random() * 0.010
|
||||||
|
time.sleep(seconds)
|
||||||
|
|
||||||
def f():
|
def f():
|
||||||
# Sleep a bit so that the thread is still running when
|
# Sleep a bit so that the thread is still running when
|
||||||
# Py_EndInterpreter is called.
|
# Py_EndInterpreter is called.
|
||||||
time.sleep(0.05)
|
random_sleep()
|
||||||
os.write(%d, b"x")
|
os.write(%d, b"x")
|
||||||
|
|
||||||
threading.Thread(target=f).start()
|
threading.Thread(target=f).start()
|
||||||
|
random_sleep()
|
||||||
""" % (w,)
|
""" % (w,)
|
||||||
ret = test.support.run_in_subinterp(code)
|
ret = test.support.run_in_subinterp(code)
|
||||||
self.assertEqual(ret, 0)
|
self.assertEqual(ret, 0)
|
||||||
@ -903,22 +945,29 @@ class SubinterpThreadingTests(BaseTestCase):
|
|||||||
self.addCleanup(os.close, w)
|
self.addCleanup(os.close, w)
|
||||||
code = r"""if 1:
|
code = r"""if 1:
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
def random_sleep():
|
||||||
|
seconds = random.random() * 0.010
|
||||||
|
time.sleep(seconds)
|
||||||
|
|
||||||
class Sleeper:
|
class Sleeper:
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
time.sleep(0.05)
|
random_sleep()
|
||||||
|
|
||||||
tls = threading.local()
|
tls = threading.local()
|
||||||
|
|
||||||
def f():
|
def f():
|
||||||
# Sleep a bit so that the thread is still running when
|
# Sleep a bit so that the thread is still running when
|
||||||
# Py_EndInterpreter is called.
|
# Py_EndInterpreter is called.
|
||||||
time.sleep(0.05)
|
random_sleep()
|
||||||
tls.x = Sleeper()
|
tls.x = Sleeper()
|
||||||
os.write(%d, b"x")
|
os.write(%d, b"x")
|
||||||
|
|
||||||
threading.Thread(target=f).start()
|
threading.Thread(target=f).start()
|
||||||
|
random_sleep()
|
||||||
""" % (w,)
|
""" % (w,)
|
||||||
ret = test.support.run_in_subinterp(code)
|
ret = test.support.run_in_subinterp(code)
|
||||||
self.assertEqual(ret, 0)
|
self.assertEqual(ret, 0)
|
||||||
|
@ -739,6 +739,11 @@ _active_limbo_lock = _allocate_lock()
|
|||||||
_active = {} # maps thread id to Thread object
|
_active = {} # maps thread id to Thread object
|
||||||
_limbo = {}
|
_limbo = {}
|
||||||
_dangling = WeakSet()
|
_dangling = WeakSet()
|
||||||
|
# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
|
||||||
|
# to wait until all Python thread states get deleted:
|
||||||
|
# see Thread._set_tstate_lock().
|
||||||
|
_shutdown_locks_lock = _allocate_lock()
|
||||||
|
_shutdown_locks = set()
|
||||||
|
|
||||||
# Main class for threads
|
# Main class for threads
|
||||||
|
|
||||||
@ -903,6 +908,10 @@ class Thread:
|
|||||||
self._tstate_lock = _set_sentinel()
|
self._tstate_lock = _set_sentinel()
|
||||||
self._tstate_lock.acquire()
|
self._tstate_lock.acquire()
|
||||||
|
|
||||||
|
if not self.daemon:
|
||||||
|
with _shutdown_locks_lock:
|
||||||
|
_shutdown_locks.add(self._tstate_lock)
|
||||||
|
|
||||||
def _bootstrap_inner(self):
|
def _bootstrap_inner(self):
|
||||||
try:
|
try:
|
||||||
self._set_ident()
|
self._set_ident()
|
||||||
@ -954,6 +963,9 @@ class Thread:
|
|||||||
assert not lock.locked()
|
assert not lock.locked()
|
||||||
self._is_stopped = True
|
self._is_stopped = True
|
||||||
self._tstate_lock = None
|
self._tstate_lock = None
|
||||||
|
if not self.daemon:
|
||||||
|
with _shutdown_locks_lock:
|
||||||
|
_shutdown_locks.discard(self._tstate_lock)
|
||||||
|
|
||||||
def _delete(self):
|
def _delete(self):
|
||||||
"Remove current thread from the dict of currently running threads."
|
"Remove current thread from the dict of currently running threads."
|
||||||
@ -1342,6 +1354,9 @@ from _thread import stack_size
|
|||||||
_main_thread = _MainThread()
|
_main_thread = _MainThread()
|
||||||
|
|
||||||
def _shutdown():
|
def _shutdown():
|
||||||
|
"""
|
||||||
|
Wait until the Python thread state of all non-daemon threads get deleted.
|
||||||
|
"""
|
||||||
# Obscure: other threads may be waiting to join _main_thread. That's
|
# Obscure: other threads may be waiting to join _main_thread. That's
|
||||||
# dubious, but some code does it. We can't wait for C code to release
|
# dubious, but some code does it. We can't wait for C code to release
|
||||||
# the main thread's tstate_lock - that won't happen until the interpreter
|
# the main thread's tstate_lock - that won't happen until the interpreter
|
||||||
@ -1350,6 +1365,8 @@ def _shutdown():
|
|||||||
if _main_thread._is_stopped:
|
if _main_thread._is_stopped:
|
||||||
# _shutdown() was already called
|
# _shutdown() was already called
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Main thread
|
||||||
tlock = _main_thread._tstate_lock
|
tlock = _main_thread._tstate_lock
|
||||||
# The main thread isn't finished yet, so its thread state lock can't have
|
# The main thread isn't finished yet, so its thread state lock can't have
|
||||||
# been released.
|
# been released.
|
||||||
@ -1357,16 +1374,24 @@ def _shutdown():
|
|||||||
assert tlock.locked()
|
assert tlock.locked()
|
||||||
tlock.release()
|
tlock.release()
|
||||||
_main_thread._stop()
|
_main_thread._stop()
|
||||||
t = _pickSomeNonDaemonThread()
|
|
||||||
while t:
|
|
||||||
t.join()
|
|
||||||
t = _pickSomeNonDaemonThread()
|
|
||||||
|
|
||||||
def _pickSomeNonDaemonThread():
|
# Join all non-deamon threads
|
||||||
for t in enumerate():
|
while True:
|
||||||
if not t.daemon and t.is_alive():
|
with _shutdown_locks_lock:
|
||||||
return t
|
locks = list(_shutdown_locks)
|
||||||
return None
|
_shutdown_locks.clear()
|
||||||
|
|
||||||
|
if not locks:
|
||||||
|
break
|
||||||
|
|
||||||
|
for lock in locks:
|
||||||
|
# mimick Thread.join()
|
||||||
|
lock.acquire()
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
# new threads can be spawned while we were waiting for the other
|
||||||
|
# threads to complete
|
||||||
|
|
||||||
|
|
||||||
def main_thread():
|
def main_thread():
|
||||||
"""Return the main thread object.
|
"""Return the main thread object.
|
||||||
@ -1392,12 +1417,18 @@ def _after_fork():
|
|||||||
# Reset _active_limbo_lock, in case we forked while the lock was held
|
# Reset _active_limbo_lock, in case we forked while the lock was held
|
||||||
# by another (non-forked) thread. http://bugs.python.org/issue874900
|
# by another (non-forked) thread. http://bugs.python.org/issue874900
|
||||||
global _active_limbo_lock, _main_thread
|
global _active_limbo_lock, _main_thread
|
||||||
|
global _shutdown_locks_lock, _shutdown_locks
|
||||||
_active_limbo_lock = _allocate_lock()
|
_active_limbo_lock = _allocate_lock()
|
||||||
|
|
||||||
# fork() only copied the current thread; clear references to others.
|
# fork() only copied the current thread; clear references to others.
|
||||||
new_active = {}
|
new_active = {}
|
||||||
current = current_thread()
|
current = current_thread()
|
||||||
_main_thread = current
|
_main_thread = current
|
||||||
|
|
||||||
|
# reset _shutdown() locks: threads re-register their _tstate_lock below
|
||||||
|
_shutdown_locks_lock = _allocate_lock()
|
||||||
|
_shutdown_locks = set()
|
||||||
|
|
||||||
with _active_limbo_lock:
|
with _active_limbo_lock:
|
||||||
# Dangling thread instances must still have their locks reset,
|
# Dangling thread instances must still have their locks reset,
|
||||||
# because someone may join() them.
|
# because someone may join() them.
|
||||||
|
@ -0,0 +1,4 @@
|
|||||||
|
Fix a race condition at Python shutdown when waiting for threads. Wait until
|
||||||
|
the Python thread state of all non-daemon threads get deleted (join all
|
||||||
|
non-daemon threads), rather than just wait until non-daemon Python threads
|
||||||
|
complete.
|
Loading…
x
Reference in New Issue
Block a user