gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally (GH-133222)
When shutdown is called with wait=False, the executor thread keeps running even after the ProcessPoolExecutor's state is reset. The executor then tries to replenish the worker processes pool resulting in an error and a potential hang when it comes across a worker that has died. Fixed the issue by having _adjust_process_count() return without doing anything if the ProcessPoolExecutor's state has been reset. Added unit tests to validate two scenarios: max_workers < num_tasks (exception) max_workers > num_tasks (exception + hang)
This commit is contained in:
parent
ee7345d507
commit
598aa7cc98
@ -755,6 +755,11 @@ class ProcessPoolExecutor(_base.Executor):
|
||||
self._executor_manager_thread_wakeup
|
||||
|
||||
def _adjust_process_count(self):
|
||||
# gh-132969: avoid error when state is reset and executor is still running,
|
||||
# which will happen when shutdown(wait=False) is called.
|
||||
if self._processes is None:
|
||||
return
|
||||
|
||||
# if there's an idle process, we don't need to spawn a new one.
|
||||
if self._idle_worker_semaphore.acquire(blocking=False):
|
||||
return
|
||||
|
@ -330,6 +330,64 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
|
||||
# shutdown.
|
||||
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||
|
||||
@classmethod
|
||||
def _failing_task_gh_132969(cls, n):
|
||||
raise ValueError("failing task")
|
||||
|
||||
@classmethod
|
||||
def _good_task_gh_132969(cls, n):
|
||||
time.sleep(0.1 * n)
|
||||
return n
|
||||
|
||||
def _run_test_issue_gh_132969(self, max_workers):
|
||||
# max_workers=2 will repro exception
|
||||
# max_workers=4 will repro exception and then hang
|
||||
|
||||
# Repro conditions
|
||||
# max_tasks_per_child=1
|
||||
# a task ends abnormally
|
||||
# shutdown(wait=False) is called
|
||||
start_method = self.get_context().get_start_method()
|
||||
if (start_method == "fork" or
|
||||
(start_method == "forkserver" and sys.platform.startswith("win"))):
|
||||
self.skipTest(f"Skipping test for {start_method = }")
|
||||
executor = futures.ProcessPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
max_tasks_per_child=1,
|
||||
mp_context=self.get_context())
|
||||
f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
|
||||
f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2)
|
||||
f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
|
||||
result = 0
|
||||
try:
|
||||
result += f1.result()
|
||||
result += f2.result()
|
||||
result += f3.result()
|
||||
except ValueError:
|
||||
# stop processing results upon first exception
|
||||
pass
|
||||
|
||||
# Ensure that the executor cleans up after called
|
||||
# shutdown with wait=False
|
||||
executor_manager_thread = executor._executor_manager_thread
|
||||
executor.shutdown(wait=False)
|
||||
time.sleep(0.2)
|
||||
executor_manager_thread.join()
|
||||
return result
|
||||
|
||||
def test_shutdown_gh_132969_case_1(self):
|
||||
# gh-132969: test that exception "object of type 'NoneType' has no len()"
|
||||
# is not raised when shutdown(wait=False) is called.
|
||||
result = self._run_test_issue_gh_132969(2)
|
||||
self.assertEqual(result, 1)
|
||||
|
||||
def test_shutdown_gh_132969_case_2(self):
|
||||
# gh-132969: test that process does not hang and
|
||||
# exception "object of type 'NoneType' has no len()" is not raised
|
||||
# when shutdown(wait=False) is called.
|
||||
result = self._run_test_issue_gh_132969(4)
|
||||
self.assertEqual(result, 1)
|
||||
|
||||
|
||||
create_executor_tests(globals(), ProcessPoolShutdownTest,
|
||||
executor_mixins=(ProcessPoolForkMixin,
|
||||
|
@ -0,0 +1,7 @@
|
||||
Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread,
|
||||
which remains running when :meth:`shutdown(wait=False)
|
||||
<concurrent.futures.Executor.shutdown>`, from
|
||||
attempting to adjust the pool's worker processes after the object state has already been reset during shutdown.
|
||||
A combination of conditions, including a worker process having terminated abormally,
|
||||
resulted in an exception and a potential hang when the still-running executor thread
|
||||
attempted to replace dead workers within the pool.
|
Loading…
x
Reference in New Issue
Block a user