gh-109974: Fix threading lock_tests race conditions (#110057)
Fix race conditions in test_threading lock tests. Wait until a condition is met rather than using time.sleep() with a hardcoded number of seconds. * Replace sleeping loops with support.sleeping_retry() which raises an exception on timeout. * Add wait_threads_blocked(nthread) which computes a sleep depending on the number of threads. Remove _wait() function. * test_set_and_clear(): use a way longer Event.wait() timeout. * BarrierTests.test_repr(): wait until the 2 threads are waiting for the barrier. Use a way longer timeout for Barrier.wait() timeout. * test_thread_leak() no longer needs to count len(threading.enumerate()): Bunch uses threading_helper.wait_threads_exit() internally which does it in wait_for_finished(). * Add BaseLockTests.wait_phase() which implements a timeout. test_reacquire() and test_recursion_count() use wait_phase().
This commit is contained in:
parent
5fdcea7440
commit
4e356ad183
@ -19,22 +19,24 @@ requires_fork = unittest.skipUnless(support.has_fork_support,
|
|||||||
"(no _at_fork_reinit method)")
|
"(no _at_fork_reinit method)")
|
||||||
|
|
||||||
|
|
||||||
def _wait():
|
def wait_threads_blocked(nthread):
|
||||||
# A crude wait/yield function not relying on synchronization primitives.
|
# Arbitrary sleep to wait until N threads are blocked,
|
||||||
time.sleep(0.01)
|
# like waiting for a lock.
|
||||||
|
time.sleep(0.010 * nthread)
|
||||||
|
|
||||||
|
|
||||||
class Bunch(object):
|
class Bunch(object):
|
||||||
"""
|
"""
|
||||||
A bunch of threads.
|
A bunch of threads.
|
||||||
"""
|
"""
|
||||||
def __init__(self, f, n, wait_before_exit=False):
|
def __init__(self, func, nthread, wait_before_exit=False):
|
||||||
"""
|
"""
|
||||||
Construct a bunch of `n` threads running the same function `f`.
|
Construct a bunch of `nthread` threads running the same function `func`.
|
||||||
If `wait_before_exit` is True, the threads won't terminate until
|
If `wait_before_exit` is True, the threads won't terminate until
|
||||||
do_finish() is called.
|
do_finish() is called.
|
||||||
"""
|
"""
|
||||||
self.f = f
|
self.func = func
|
||||||
self.n = n
|
self.nthread = nthread
|
||||||
self.started = []
|
self.started = []
|
||||||
self.finished = []
|
self.finished = []
|
||||||
self._can_exit = not wait_before_exit
|
self._can_exit = not wait_before_exit
|
||||||
@ -45,26 +47,30 @@ class Bunch(object):
|
|||||||
tid = threading.get_ident()
|
tid = threading.get_ident()
|
||||||
self.started.append(tid)
|
self.started.append(tid)
|
||||||
try:
|
try:
|
||||||
f()
|
func()
|
||||||
finally:
|
finally:
|
||||||
self.finished.append(tid)
|
self.finished.append(tid)
|
||||||
while not self._can_exit:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if self._can_exit:
|
||||||
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for i in range(n):
|
for i in range(nthread):
|
||||||
start_new_thread(task, ())
|
start_new_thread(task, ())
|
||||||
except:
|
except:
|
||||||
self._can_exit = True
|
self._can_exit = True
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def wait_for_started(self):
|
def wait_for_started(self):
|
||||||
while len(self.started) < self.n:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if len(self.started) >= self.nthread:
|
||||||
|
break
|
||||||
|
|
||||||
def wait_for_finished(self):
|
def wait_for_finished(self):
|
||||||
while len(self.finished) < self.n:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if len(self.finished) >= self.nthread:
|
||||||
|
break
|
||||||
|
|
||||||
# Wait for threads exit
|
# Wait for threads exit
|
||||||
self.wait_thread.__exit__(None, None, None)
|
self.wait_thread.__exit__(None, None, None)
|
||||||
|
|
||||||
@ -94,6 +100,12 @@ class BaseLockTests(BaseTestCase):
|
|||||||
Tests for both recursive and non-recursive locks.
|
Tests for both recursive and non-recursive locks.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def wait_phase(self, phase, expected):
|
||||||
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
|
if len(phase) >= expected:
|
||||||
|
break
|
||||||
|
self.assertEqual(len(phase), expected)
|
||||||
|
|
||||||
def test_constructor(self):
|
def test_constructor(self):
|
||||||
lock = self.locktype()
|
lock = self.locktype()
|
||||||
del lock
|
del lock
|
||||||
@ -138,15 +150,18 @@ class BaseLockTests(BaseTestCase):
|
|||||||
def test_acquire_contended(self):
|
def test_acquire_contended(self):
|
||||||
lock = self.locktype()
|
lock = self.locktype()
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
N = 5
|
|
||||||
def f():
|
def f():
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
|
# Threads block on lock.acquire()
|
||||||
|
N = 5
|
||||||
b = Bunch(f, N)
|
b = Bunch(f, N)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
_wait()
|
wait_threads_blocked(N)
|
||||||
self.assertEqual(len(b.finished), 0)
|
self.assertEqual(len(b.finished), 0)
|
||||||
|
|
||||||
|
# Threads unblocked
|
||||||
lock.release()
|
lock.release()
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
self.assertEqual(len(b.finished), N)
|
self.assertEqual(len(b.finished), N)
|
||||||
@ -174,17 +189,10 @@ class BaseLockTests(BaseTestCase):
|
|||||||
def f():
|
def f():
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
lock.release()
|
lock.release()
|
||||||
n = len(threading.enumerate())
|
|
||||||
# We run many threads in the hope that existing threads ids won't
|
# We run many threads in the hope that existing threads ids won't
|
||||||
# be recycled.
|
# be recycled.
|
||||||
Bunch(f, 15).wait_for_finished()
|
Bunch(f, 15).wait_for_finished()
|
||||||
if len(threading.enumerate()) != n:
|
|
||||||
# There is a small window during which a Thread instance's
|
|
||||||
# target function has finished running, but the Thread is still
|
|
||||||
# alive and registered. Avoid spurious failures by waiting a
|
|
||||||
# bit more (seen on a buildbot).
|
|
||||||
time.sleep(0.4)
|
|
||||||
self.assertEqual(n, len(threading.enumerate()))
|
|
||||||
|
|
||||||
def test_timeout(self):
|
def test_timeout(self):
|
||||||
lock = self.locktype()
|
lock = self.locktype()
|
||||||
@ -242,15 +250,13 @@ class LockTests(BaseLockTests):
|
|||||||
phase.append(None)
|
phase.append(None)
|
||||||
|
|
||||||
with threading_helper.wait_threads_exit():
|
with threading_helper.wait_threads_exit():
|
||||||
|
# Thread blocked on lock.acquire()
|
||||||
start_new_thread(f, ())
|
start_new_thread(f, ())
|
||||||
while len(phase) == 0:
|
self.wait_phase(phase, 1)
|
||||||
_wait()
|
|
||||||
_wait()
|
# Thread unblocked
|
||||||
self.assertEqual(len(phase), 1)
|
|
||||||
lock.release()
|
lock.release()
|
||||||
while len(phase) == 1:
|
self.wait_phase(phase, 2)
|
||||||
_wait()
|
|
||||||
self.assertEqual(len(phase), 2)
|
|
||||||
|
|
||||||
def test_different_thread(self):
|
def test_different_thread(self):
|
||||||
# Lock can be released from a different thread.
|
# Lock can be released from a different thread.
|
||||||
@ -349,21 +355,20 @@ class RLockTests(BaseLockTests):
|
|||||||
def f():
|
def f():
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
phase.append(None)
|
phase.append(None)
|
||||||
while len(phase) == 1:
|
|
||||||
_wait()
|
self.wait_phase(phase, 2)
|
||||||
lock.release()
|
lock.release()
|
||||||
phase.append(None)
|
phase.append(None)
|
||||||
|
|
||||||
with threading_helper.wait_threads_exit():
|
with threading_helper.wait_threads_exit():
|
||||||
|
# Thread blocked on lock.acquire()
|
||||||
start_new_thread(f, ())
|
start_new_thread(f, ())
|
||||||
while len(phase) == 0:
|
self.wait_phase(phase, 1)
|
||||||
_wait()
|
|
||||||
self.assertEqual(len(phase), 1)
|
|
||||||
self.assertEqual(0, lock._recursion_count())
|
self.assertEqual(0, lock._recursion_count())
|
||||||
|
|
||||||
|
# Thread unblocked
|
||||||
phase.append(None)
|
phase.append(None)
|
||||||
while len(phase) == 2:
|
self.wait_phase(phase, 3)
|
||||||
_wait()
|
|
||||||
self.assertEqual(len(phase), 3)
|
|
||||||
self.assertEqual(0, lock._recursion_count())
|
self.assertEqual(0, lock._recursion_count())
|
||||||
|
|
||||||
def test_different_thread(self):
|
def test_different_thread(self):
|
||||||
@ -421,10 +426,14 @@ class EventTests(BaseTestCase):
|
|||||||
def f():
|
def f():
|
||||||
results1.append(evt.wait())
|
results1.append(evt.wait())
|
||||||
results2.append(evt.wait())
|
results2.append(evt.wait())
|
||||||
|
|
||||||
|
# Threads blocked on first evt.wait()
|
||||||
b = Bunch(f, N)
|
b = Bunch(f, N)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
_wait()
|
wait_threads_blocked(N)
|
||||||
self.assertEqual(len(results1), 0)
|
self.assertEqual(len(results1), 0)
|
||||||
|
|
||||||
|
# Threads unblocked
|
||||||
evt.set()
|
evt.set()
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
self.assertEqual(results1, [True] * N)
|
self.assertEqual(results1, [True] * N)
|
||||||
@ -464,19 +473,22 @@ class EventTests(BaseTestCase):
|
|||||||
self.assertTrue(r)
|
self.assertTrue(r)
|
||||||
|
|
||||||
def test_set_and_clear(self):
|
def test_set_and_clear(self):
|
||||||
# Issue #13502: check that wait() returns true even when the event is
|
# gh-57711: check that wait() returns true even when the event is
|
||||||
# cleared before the waiting thread is woken up.
|
# cleared before the waiting thread is woken up.
|
||||||
evt = self.eventtype()
|
event = self.eventtype()
|
||||||
results = []
|
results = []
|
||||||
timeout = 0.250
|
|
||||||
N = 5
|
|
||||||
def f():
|
def f():
|
||||||
results.append(evt.wait(timeout * 4))
|
results.append(event.wait(support.LONG_TIMEOUT))
|
||||||
|
|
||||||
|
# Threads blocked on event.wait()
|
||||||
|
N = 5
|
||||||
b = Bunch(f, N)
|
b = Bunch(f, N)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
time.sleep(timeout)
|
wait_threads_blocked(N)
|
||||||
evt.set()
|
|
||||||
evt.clear()
|
# Threads unblocked
|
||||||
|
event.set()
|
||||||
|
event.clear()
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
self.assertEqual(results, [True] * N)
|
self.assertEqual(results, [True] * N)
|
||||||
|
|
||||||
@ -533,15 +545,14 @@ class ConditionTests(BaseTestCase):
|
|||||||
# Note that this test is sensitive to timing. If the worker threads
|
# Note that this test is sensitive to timing. If the worker threads
|
||||||
# don't execute in a timely fashion, the main thread may think they
|
# don't execute in a timely fashion, the main thread may think they
|
||||||
# are further along then they are. The main thread therefore issues
|
# are further along then they are. The main thread therefore issues
|
||||||
# _wait() statements to try to make sure that it doesn't race ahead
|
# wait_threads_blocked() statements to try to make sure that it doesn't
|
||||||
# of the workers.
|
# race ahead of the workers.
|
||||||
# Secondly, this test assumes that condition variables are not subject
|
# Secondly, this test assumes that condition variables are not subject
|
||||||
# to spurious wakeups. The absence of spurious wakeups is an implementation
|
# to spurious wakeups. The absence of spurious wakeups is an implementation
|
||||||
# detail of Condition Variables in current CPython, but in general, not
|
# detail of Condition Variables in current CPython, but in general, not
|
||||||
# a guaranteed property of condition variables as a programming
|
# a guaranteed property of condition variables as a programming
|
||||||
# construct. In particular, it is possible that this can no longer
|
# construct. In particular, it is possible that this can no longer
|
||||||
# be conveniently guaranteed should their implementation ever change.
|
# be conveniently guaranteed should their implementation ever change.
|
||||||
N = 5
|
|
||||||
ready = []
|
ready = []
|
||||||
results1 = []
|
results1 = []
|
||||||
results2 = []
|
results2 = []
|
||||||
@ -550,57 +561,84 @@ class ConditionTests(BaseTestCase):
|
|||||||
cond.acquire()
|
cond.acquire()
|
||||||
ready.append(phase_num)
|
ready.append(phase_num)
|
||||||
result = cond.wait()
|
result = cond.wait()
|
||||||
|
|
||||||
cond.release()
|
cond.release()
|
||||||
results1.append((result, phase_num))
|
results1.append((result, phase_num))
|
||||||
|
|
||||||
cond.acquire()
|
cond.acquire()
|
||||||
ready.append(phase_num)
|
ready.append(phase_num)
|
||||||
|
|
||||||
result = cond.wait()
|
result = cond.wait()
|
||||||
cond.release()
|
cond.release()
|
||||||
results2.append((result, phase_num))
|
results2.append((result, phase_num))
|
||||||
|
|
||||||
|
N = 5
|
||||||
b = Bunch(f, N)
|
b = Bunch(f, N)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
# first wait, to ensure all workers settle into cond.wait() before
|
# first wait, to ensure all workers settle into cond.wait() before
|
||||||
# we continue. See issues #8799 and #30727.
|
# we continue. See issues #8799 and #30727.
|
||||||
while len(ready) < 5:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if len(ready) >= N:
|
||||||
|
break
|
||||||
|
|
||||||
ready.clear()
|
ready.clear()
|
||||||
self.assertEqual(results1, [])
|
self.assertEqual(results1, [])
|
||||||
|
|
||||||
# Notify 3 threads at first
|
# Notify 3 threads at first
|
||||||
|
count1 = 3
|
||||||
cond.acquire()
|
cond.acquire()
|
||||||
cond.notify(3)
|
cond.notify(count1)
|
||||||
_wait()
|
wait_threads_blocked(count1)
|
||||||
|
|
||||||
|
# Phase 1
|
||||||
phase_num = 1
|
phase_num = 1
|
||||||
cond.release()
|
cond.release()
|
||||||
while len(results1) < 3:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if len(results1) >= count1:
|
||||||
self.assertEqual(results1, [(True, 1)] * 3)
|
break
|
||||||
|
|
||||||
|
self.assertEqual(results1, [(True, 1)] * count1)
|
||||||
self.assertEqual(results2, [])
|
self.assertEqual(results2, [])
|
||||||
# make sure all awaken workers settle into cond.wait()
|
|
||||||
while len(ready) < 3:
|
# Wait until awaken workers are blocked on cond.wait()
|
||||||
_wait()
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
|
if len(ready) >= count1 :
|
||||||
|
break
|
||||||
|
|
||||||
# Notify 5 threads: they might be in their first or second wait
|
# Notify 5 threads: they might be in their first or second wait
|
||||||
cond.acquire()
|
cond.acquire()
|
||||||
cond.notify(5)
|
cond.notify(5)
|
||||||
_wait()
|
wait_threads_blocked(N)
|
||||||
|
|
||||||
|
# Phase 2
|
||||||
phase_num = 2
|
phase_num = 2
|
||||||
cond.release()
|
cond.release()
|
||||||
while len(results1) + len(results2) < 8:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if len(results1) + len(results2) >= (N + count1):
|
||||||
self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
|
break
|
||||||
self.assertEqual(results2, [(True, 2)] * 3)
|
|
||||||
# make sure all workers settle into cond.wait()
|
count2 = N - count1
|
||||||
while len(ready) < 5:
|
self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
|
||||||
_wait()
|
self.assertEqual(results2, [(True, 2)] * count1)
|
||||||
|
|
||||||
|
# Make sure all workers settle into cond.wait()
|
||||||
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
|
if len(ready) >= N:
|
||||||
|
break
|
||||||
|
|
||||||
# Notify all threads: they are all in their second wait
|
# Notify all threads: they are all in their second wait
|
||||||
cond.acquire()
|
cond.acquire()
|
||||||
cond.notify_all()
|
cond.notify_all()
|
||||||
_wait()
|
wait_threads_blocked(N)
|
||||||
|
|
||||||
|
# Phase 3
|
||||||
phase_num = 3
|
phase_num = 3
|
||||||
cond.release()
|
cond.release()
|
||||||
while len(results2) < 5:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
_wait()
|
if len(results2) >= N:
|
||||||
self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
|
break
|
||||||
self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
|
self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
|
||||||
|
self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
|
|
||||||
def test_notify(self):
|
def test_notify(self):
|
||||||
@ -611,19 +649,22 @@ class ConditionTests(BaseTestCase):
|
|||||||
|
|
||||||
def test_timeout(self):
|
def test_timeout(self):
|
||||||
cond = self.condtype()
|
cond = self.condtype()
|
||||||
|
timeout = 0.5
|
||||||
results = []
|
results = []
|
||||||
N = 5
|
|
||||||
def f():
|
def f():
|
||||||
cond.acquire()
|
cond.acquire()
|
||||||
t1 = time.monotonic()
|
t1 = time.monotonic()
|
||||||
result = cond.wait(0.5)
|
result = cond.wait(timeout)
|
||||||
t2 = time.monotonic()
|
t2 = time.monotonic()
|
||||||
cond.release()
|
cond.release()
|
||||||
results.append((t2 - t1, result))
|
results.append((t2 - t1, result))
|
||||||
|
|
||||||
|
N = 5
|
||||||
Bunch(f, N).wait_for_finished()
|
Bunch(f, N).wait_for_finished()
|
||||||
self.assertEqual(len(results), N)
|
self.assertEqual(len(results), N)
|
||||||
|
|
||||||
for dt, result in results:
|
for dt, result in results:
|
||||||
self.assertTimeout(dt, 0.5)
|
self.assertTimeout(dt, timeout)
|
||||||
# Note that conceptually (that"s the condition variable protocol)
|
# Note that conceptually (that"s the condition variable protocol)
|
||||||
# a wait() may succeed even if no one notifies us and before any
|
# a wait() may succeed even if no one notifies us and before any
|
||||||
# timeout occurs. Spurious wakeups can occur.
|
# timeout occurs. Spurious wakeups can occur.
|
||||||
@ -636,13 +677,13 @@ class ConditionTests(BaseTestCase):
|
|||||||
state = 0
|
state = 0
|
||||||
def f():
|
def f():
|
||||||
with cond:
|
with cond:
|
||||||
result = cond.wait_for(lambda : state==4)
|
result = cond.wait_for(lambda: state == 4)
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
self.assertEqual(state, 4)
|
self.assertEqual(state, 4)
|
||||||
b = Bunch(f, 1)
|
b = Bunch(f, 1)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
for i in range(4):
|
for i in range(4):
|
||||||
time.sleep(0.01)
|
time.sleep(0.010)
|
||||||
with cond:
|
with cond:
|
||||||
state += 1
|
state += 1
|
||||||
cond.notify()
|
cond.notify()
|
||||||
@ -660,14 +701,16 @@ class ConditionTests(BaseTestCase):
|
|||||||
self.assertFalse(result)
|
self.assertFalse(result)
|
||||||
self.assertTimeout(dt, 0.1)
|
self.assertTimeout(dt, 0.1)
|
||||||
success.append(None)
|
success.append(None)
|
||||||
|
|
||||||
b = Bunch(f, 1)
|
b = Bunch(f, 1)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
# Only increment 3 times, so state == 4 is never reached.
|
# Only increment 3 times, so state == 4 is never reached.
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
time.sleep(0.01)
|
time.sleep(0.010)
|
||||||
with cond:
|
with cond:
|
||||||
state += 1
|
state += 1
|
||||||
cond.notify()
|
cond.notify()
|
||||||
|
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
self.assertEqual(len(success), 1)
|
self.assertEqual(len(success), 1)
|
||||||
|
|
||||||
@ -697,70 +740,107 @@ class BaseSemaphoreTests(BaseTestCase):
|
|||||||
del sem
|
del sem
|
||||||
|
|
||||||
def test_acquire_contended(self):
|
def test_acquire_contended(self):
|
||||||
sem = self.semtype(7)
|
sem_value = 7
|
||||||
|
sem = self.semtype(sem_value)
|
||||||
sem.acquire()
|
sem.acquire()
|
||||||
N = 10
|
|
||||||
sem_results = []
|
sem_results = []
|
||||||
results1 = []
|
results1 = []
|
||||||
results2 = []
|
results2 = []
|
||||||
phase_num = 0
|
phase_num = 0
|
||||||
def f():
|
|
||||||
|
def func():
|
||||||
sem_results.append(sem.acquire())
|
sem_results.append(sem.acquire())
|
||||||
results1.append(phase_num)
|
results1.append(phase_num)
|
||||||
|
|
||||||
sem_results.append(sem.acquire())
|
sem_results.append(sem.acquire())
|
||||||
results2.append(phase_num)
|
results2.append(phase_num)
|
||||||
b = Bunch(f, 10)
|
|
||||||
|
def wait_count(count):
|
||||||
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
|
if len(results1) + len(results2) >= count:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Phase 0
|
||||||
|
N = 10
|
||||||
|
b = Bunch(func, N)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
while len(results1) + len(results2) < 6:
|
count1 = sem_value - 1
|
||||||
_wait()
|
wait_count(count1)
|
||||||
self.assertEqual(results1 + results2, [0] * 6)
|
self.assertEqual(results1 + results2, [0] * count1)
|
||||||
|
|
||||||
|
# Phase 1
|
||||||
phase_num = 1
|
phase_num = 1
|
||||||
for i in range(7):
|
for i in range(sem_value):
|
||||||
sem.release()
|
sem.release()
|
||||||
while len(results1) + len(results2) < 13:
|
count2 = sem_value
|
||||||
_wait()
|
wait_count(count1 + count2)
|
||||||
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
|
self.assertEqual(sorted(results1 + results2),
|
||||||
|
[0] * count1 + [1] * count2)
|
||||||
|
|
||||||
|
# Phase 2
|
||||||
phase_num = 2
|
phase_num = 2
|
||||||
for i in range(6):
|
count3 = (sem_value - 1)
|
||||||
|
for i in range(count3):
|
||||||
sem.release()
|
sem.release()
|
||||||
while len(results1) + len(results2) < 19:
|
wait_count(count1 + count2 + count3)
|
||||||
_wait()
|
self.assertEqual(sorted(results1 + results2),
|
||||||
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
|
[0] * count1 + [1] * count2 + [2] * count3)
|
||||||
# The semaphore is still locked
|
# The semaphore is still locked
|
||||||
self.assertFalse(sem.acquire(False))
|
self.assertFalse(sem.acquire(False))
|
||||||
|
|
||||||
# Final release, to let the last thread finish
|
# Final release, to let the last thread finish
|
||||||
|
count4 = 1
|
||||||
sem.release()
|
sem.release()
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
|
self.assertEqual(sem_results,
|
||||||
|
[True] * (count1 + count2 + count3 + count4))
|
||||||
|
|
||||||
def test_multirelease(self):
|
def test_multirelease(self):
|
||||||
sem = self.semtype(7)
|
sem_value = 7
|
||||||
|
sem = self.semtype(sem_value)
|
||||||
sem.acquire()
|
sem.acquire()
|
||||||
|
|
||||||
results1 = []
|
results1 = []
|
||||||
results2 = []
|
results2 = []
|
||||||
phase_num = 0
|
phase_num = 0
|
||||||
def f():
|
def func():
|
||||||
sem.acquire()
|
sem.acquire()
|
||||||
results1.append(phase_num)
|
results1.append(phase_num)
|
||||||
|
|
||||||
sem.acquire()
|
sem.acquire()
|
||||||
results2.append(phase_num)
|
results2.append(phase_num)
|
||||||
b = Bunch(f, 10)
|
|
||||||
|
def wait_count(count):
|
||||||
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
|
if len(results1) + len(results2) >= count:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Phase 0
|
||||||
|
b = Bunch(func, 10)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
while len(results1) + len(results2) < 6:
|
count1 = sem_value - 1
|
||||||
_wait()
|
wait_count(count1)
|
||||||
self.assertEqual(results1 + results2, [0] * 6)
|
self.assertEqual(results1 + results2, [0] * count1)
|
||||||
|
|
||||||
|
# Phase 1
|
||||||
phase_num = 1
|
phase_num = 1
|
||||||
sem.release(7)
|
count2 = sem_value
|
||||||
while len(results1) + len(results2) < 13:
|
sem.release(count2)
|
||||||
_wait()
|
wait_count(count1 + count2)
|
||||||
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
|
self.assertEqual(sorted(results1 + results2),
|
||||||
|
[0] * count1 + [1] * count2)
|
||||||
|
|
||||||
|
# Phase 2
|
||||||
phase_num = 2
|
phase_num = 2
|
||||||
sem.release(6)
|
count3 = sem_value - 1
|
||||||
while len(results1) + len(results2) < 19:
|
sem.release(count3)
|
||||||
_wait()
|
wait_count(count1 + count2 + count3)
|
||||||
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
|
self.assertEqual(sorted(results1 + results2),
|
||||||
|
[0] * count1 + [1] * count2 + [2] * count3)
|
||||||
# The semaphore is still locked
|
# The semaphore is still locked
|
||||||
self.assertFalse(sem.acquire(False))
|
self.assertFalse(sem.acquire(False))
|
||||||
|
|
||||||
# Final release, to let the last thread finish
|
# Final release, to let the last thread finish
|
||||||
sem.release()
|
sem.release()
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
@ -806,10 +886,14 @@ class BaseSemaphoreTests(BaseTestCase):
|
|||||||
def f():
|
def f():
|
||||||
sem.acquire()
|
sem.acquire()
|
||||||
sem.release()
|
sem.release()
|
||||||
|
|
||||||
|
# Thread blocked on sem.acquire()
|
||||||
b = Bunch(f, 1)
|
b = Bunch(f, 1)
|
||||||
b.wait_for_started()
|
b.wait_for_started()
|
||||||
_wait()
|
wait_threads_blocked(1)
|
||||||
self.assertFalse(b.finished)
|
self.assertFalse(b.finished)
|
||||||
|
|
||||||
|
# Thread unblocked
|
||||||
sem.release()
|
sem.release()
|
||||||
b.wait_for_finished()
|
b.wait_for_finished()
|
||||||
|
|
||||||
@ -882,6 +966,7 @@ class BarrierTests(BaseTestCase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
|
self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.barrier.abort()
|
self.barrier.abort()
|
||||||
|
|
||||||
@ -979,8 +1064,9 @@ class BarrierTests(BaseTestCase):
|
|||||||
i = self.barrier.wait()
|
i = self.barrier.wait()
|
||||||
if i == self.N//2:
|
if i == self.N//2:
|
||||||
# Wait until the other threads are all in the barrier.
|
# Wait until the other threads are all in the barrier.
|
||||||
while self.barrier.n_waiting < self.N-1:
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
time.sleep(0.001)
|
if self.barrier.n_waiting >= (self.N - 1):
|
||||||
|
break
|
||||||
self.barrier.reset()
|
self.barrier.reset()
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
@ -1068,16 +1154,29 @@ class BarrierTests(BaseTestCase):
|
|||||||
b.wait()
|
b.wait()
|
||||||
|
|
||||||
def test_repr(self):
|
def test_repr(self):
|
||||||
b = self.barriertype(3)
|
barrier = self.barriertype(3)
|
||||||
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
|
timeout = support.LONG_TIMEOUT
|
||||||
|
self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
|
||||||
def f():
|
def f():
|
||||||
b.wait(3)
|
barrier.wait(timeout)
|
||||||
bunch = Bunch(f, 2)
|
|
||||||
|
# Threads blocked on barrier.wait()
|
||||||
|
N = 2
|
||||||
|
bunch = Bunch(f, N)
|
||||||
bunch.wait_for_started()
|
bunch.wait_for_started()
|
||||||
time.sleep(0.2)
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||||
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>")
|
if barrier.n_waiting >= N:
|
||||||
b.wait(3)
|
break
|
||||||
|
self.assertRegex(repr(barrier),
|
||||||
|
r"<\w+\.Barrier at .*: waiters=2/3>")
|
||||||
|
|
||||||
|
# Threads unblocked
|
||||||
|
barrier.wait(timeout)
|
||||||
bunch.wait_for_finished()
|
bunch.wait_for_finished()
|
||||||
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
|
self.assertRegex(repr(barrier),
|
||||||
b.abort()
|
r"<\w+\.Barrier at .*: waiters=0/3>")
|
||||||
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>")
|
|
||||||
|
# Abort the barrier
|
||||||
|
barrier.abort()
|
||||||
|
self.assertRegex(repr(barrier),
|
||||||
|
r"<\w+\.Barrier at .*: broken>")
|
||||||
|
@ -0,0 +1,3 @@
|
|||||||
|
Fix race conditions in test_threading lock tests. Wait until a condition is met
|
||||||
|
rather than using :func:`time.sleep` with a hardcoded number of seconds. Patch
|
||||||
|
by Victor Stinner.
|
Loading…
x
Reference in New Issue
Block a user