Removes an inefficient spin loop in as_completed
This commit is contained in:
parent
42dd524cff
commit
3ec60183a4
@ -12,6 +12,7 @@ import time
|
|||||||
FIRST_COMPLETED = 'FIRST_COMPLETED'
|
FIRST_COMPLETED = 'FIRST_COMPLETED'
|
||||||
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
|
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
|
||||||
ALL_COMPLETED = 'ALL_COMPLETED'
|
ALL_COMPLETED = 'ALL_COMPLETED'
|
||||||
|
_AS_COMPLETED = '_AS_COMPLETED'
|
||||||
|
|
||||||
# Possible future states (for internal use by the futures package).
|
# Possible future states (for internal use by the futures package).
|
||||||
PENDING = 'PENDING'
|
PENDING = 'PENDING'
|
||||||
@ -70,8 +71,30 @@ class _Waiter(object):
|
|||||||
def add_cancelled(self, future):
|
def add_cancelled(self, future):
|
||||||
self.finished_futures.append(future)
|
self.finished_futures.append(future)
|
||||||
|
|
||||||
|
class _AsCompletedWaiter(_Waiter):
|
||||||
|
"""Used by as_completed()."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(_AsCompletedWaiter, self).__init__()
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
def add_result(self, future):
|
||||||
|
with self.lock:
|
||||||
|
super(_AsCompletedWaiter, self).add_result(future)
|
||||||
|
self.event.set()
|
||||||
|
|
||||||
|
def add_exception(self, future):
|
||||||
|
with self.lock:
|
||||||
|
super(_AsCompletedWaiter, self).add_exception(future)
|
||||||
|
self.event.set()
|
||||||
|
|
||||||
|
def add_cancelled(self, future):
|
||||||
|
with self.lock:
|
||||||
|
super(_AsCompletedWaiter, self).add_cancelled(future)
|
||||||
|
self.event.set()
|
||||||
|
|
||||||
class _FirstCompletedWaiter(_Waiter):
|
class _FirstCompletedWaiter(_Waiter):
|
||||||
"""Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
|
"""Used by wait(return_when=FIRST_COMPLETED)."""
|
||||||
|
|
||||||
def add_result(self, future):
|
def add_result(self, future):
|
||||||
super().add_result(future)
|
super().add_result(future)
|
||||||
@ -128,7 +151,9 @@ class _AcquireFutures(object):
|
|||||||
future._condition.release()
|
future._condition.release()
|
||||||
|
|
||||||
def _create_and_install_waiters(fs, return_when):
|
def _create_and_install_waiters(fs, return_when):
|
||||||
if return_when == FIRST_COMPLETED:
|
if return_when == _AS_COMPLETED:
|
||||||
|
waiter = _AsCompletedWaiter()
|
||||||
|
elif return_when == FIRST_COMPLETED:
|
||||||
waiter = _FirstCompletedWaiter()
|
waiter = _FirstCompletedWaiter()
|
||||||
else:
|
else:
|
||||||
pending_count = sum(
|
pending_count = sum(
|
||||||
@ -171,7 +196,7 @@ def as_completed(fs, timeout=None):
|
|||||||
f for f in fs
|
f for f in fs
|
||||||
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
|
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
|
||||||
pending = set(fs) - finished
|
pending = set(fs) - finished
|
||||||
waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
|
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for future in finished:
|
for future in finished:
|
||||||
@ -187,11 +212,15 @@ def as_completed(fs, timeout=None):
|
|||||||
'%d (of %d) futures unfinished' % (
|
'%d (of %d) futures unfinished' % (
|
||||||
len(pending), len(fs)))
|
len(pending), len(fs)))
|
||||||
|
|
||||||
waiter.event.wait(timeout)
|
waiter.event.wait(wait_timeout)
|
||||||
|
|
||||||
for future in waiter.finished_futures[:]:
|
with waiter.lock:
|
||||||
|
finished = waiter.finished_futures
|
||||||
|
waiter.finished_futures = []
|
||||||
|
waiter.event.clear()
|
||||||
|
|
||||||
|
for future in finished:
|
||||||
yield future
|
yield future
|
||||||
waiter.finished_futures.remove(future)
|
|
||||||
pending.remove(future)
|
pending.remove(future)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user