8319662: ForkJoinPool trims worker threads too slowly
8319498: ForkJoinPool.invoke(ForkJoinTask) does not specify behavior when task throws checked exception Reviewed-by: alanb
This commit is contained in:
parent
90e433d72e
commit
cc25d8b12b
@ -556,14 +556,16 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* signalling (because its queue is empty), also resulting in
|
||||
* logarithmic full activation time
|
||||
*
|
||||
* * Because we don't know about usage patterns (or most commonly,
|
||||
* * Because we don't know about usage patterns (or most commonly,
|
||||
* mixtures), we use both approaches, which present even more
|
||||
* opportunities to over-signal. Note that in either of these
|
||||
* contexts, signals may be (and often are) unnecessary because
|
||||
* active workers continue scanning after running tasks without
|
||||
* the need to be signalled (which is one reason work stealing
|
||||
* is often faster than alternatives), so additional workers
|
||||
* aren't needed. But there is no efficient way to detect this.
|
||||
* aren't needed. We filter out some of these cases by exiting
|
||||
* retry loops in signalWork if the task responsible for the
|
||||
* signal has already been taken.
|
||||
*
|
||||
* * For rapidly branching tasks that require full pool resources,
|
||||
* oversignalling is OK, because signalWork will soon have no
|
||||
@ -579,9 +581,7 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* top-level polls (argument "window" in method scan, with setup
|
||||
* in method runWorker) as an encoded sliding window of current
|
||||
* and previous two sources (or INVALID_ID if none), and stop
|
||||
* signalling when all were from the same source. Also, retries
|
||||
* are suppressed on CAS failures by newly activated workers,
|
||||
* which serves as a form of admission control. These
|
||||
* signalling when all were from the same source. These
|
||||
* mechanisms may result in transiently too few workers, but
|
||||
* once workers poll from a new source, they rapidly reactivate
|
||||
* others.
|
||||
@ -609,22 +609,13 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* contention.
|
||||
*
|
||||
* Deactivation. When method scan indicates that no tasks are
|
||||
* found by a worker, it tries to deactivate (in runWorker). Note
|
||||
* that not finding tasks doesn't mean that there won't soon be
|
||||
* some. Further, a scan may give up under contention, returning
|
||||
* even without knowing whether any tasks are still present, which
|
||||
* is OK given, a secondary check (in awaitWork) needed to cover
|
||||
* deactivation/signal races. Blocking and unblocking via
|
||||
* park/unpark can cause serious slowdowns when tasks are rapidly
|
||||
* but irregularly generated (which is often due to garbage
|
||||
* collectors and other activities). One way to ameliorate is for
|
||||
* workers to rescan multiple times, even when there are unlikely
|
||||
* to be tasks. But this causes enough memory traffic and CAS
|
||||
* contention to prefer using quieter short spinwaits in awaitWork
|
||||
* and elsewhere. Those in awaitWork are set to small values that
|
||||
* only cover near-miss scenarios for inactivate/activate races.
|
||||
* Because idle workers are often not yet blocked (parked), we use
|
||||
* the WorkQueue parker field to advertise that a waiter actually
|
||||
* found by a worker, it tries to deactivate (in awaitWork),
|
||||
* giving up (and rescanning) on ctl contention. To avoid missed
|
||||
* signals during deactivation, the method rescans and reactivates
|
||||
* if there may have been a missed signal during deactivation,
|
||||
* filtering out most cases in which this is unnecessary. Because
|
||||
* idle workers are often not yet blocked (parked), we use the
|
||||
* WorkQueue parking field to advertise that a waiter actually
|
||||
* needs unparking upon signal.
|
||||
*
|
||||
* Quiescence. Workers scan looking for work, giving up when they
|
||||
@ -676,7 +667,10 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* Trimming workers. To release resources after periods of lack of
|
||||
* use, a worker starting to wait when the pool is quiescent will
|
||||
* time out and terminate if the pool has remained quiescent for
|
||||
* period given by field keepAlive.
|
||||
* period given by field keepAlive (default 60sec), which applies
|
||||
* to the first timeout of a fully populated pool. Subsequent (or
|
||||
* other) cases use delays such that, if still quiescent, all will
|
||||
* be released before one additional keepAlive unit elapses.
|
||||
*
|
||||
* Joining Tasks
|
||||
* =============
|
||||
@ -687,7 +681,7 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* task would otherwise be blocked waiting for completion of
|
||||
* another, basically, just by running that task or one of its
|
||||
* subtasks if not already taken. These mechanics are disabled for
|
||||
* InterruptibleTasks, that guarantee that callers do not executed
|
||||
* InterruptibleTasks, that guarantee that callers do not execute
|
||||
* submitted tasks.
|
||||
*
|
||||
* The basic structure of joining is an extended spin/block scheme
|
||||
@ -886,22 +880,24 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* control (for good reason). Similarly for relying on fields
|
||||
* being placed in size-sorted declaration order.
|
||||
*
|
||||
* For class ForkJoinPool, it is usually more effective to order
|
||||
* fields such that the most commonly accessed fields are unlikely
|
||||
* to share cache lines with adjacent objects under JVM layout
|
||||
* rules. For class WorkQueue, an embedded @Contended region
|
||||
* segregates fields most heavily updated by owners from those
|
||||
* most commonly read by stealers or other management. Initial
|
||||
* sizing and resizing of WorkQueue arrays is an even more
|
||||
* delicate tradeoff because the best strategy systematically
|
||||
* varies across garbage collectors. Small arrays are better for
|
||||
* locality and reduce GC scan time, but large arrays reduce both
|
||||
* direct false-sharing and indirect cases due to GC bookkeeping
|
||||
* (cardmarks etc), and reduce the number of resizes, which are
|
||||
* not especially fast because they require atomic transfers.
|
||||
* Currently, arrays are initialized to be fairly small but early
|
||||
* resizes rapidly increase size by more than a factor of two
|
||||
* until very large. (Maintenance note: any changes in fields,
|
||||
* We isolate the ForkJoinPool.ctl field that otherwise causes the
|
||||
* most false-sharing misses with respect to other fields. Also,
|
||||
* ForkJoinPool fields are ordered such that fields less prone to
|
||||
* contention effects are first, offsetting those that otherwise
|
||||
* would be, while also reducing total footprint vs using
|
||||
* multiple @Contended regions, which tends to slow down
|
||||
* less-contended applications. For class WorkQueue, an
|
||||
* embedded @Contended region segregates fields most heavily
|
||||
* updated by owners from those most commonly read by stealers or
|
||||
* other management. Initial sizing and resizing of WorkQueue
|
||||
* arrays is an even more delicate tradeoff because the best
|
||||
* strategy systematically varies across garbage collectors. Small
|
||||
* arrays are better for locality and reduce GC scan time, but
|
||||
* large arrays reduce both direct false-sharing and indirect
|
||||
* cases due to GC bookkeeping (cardmarks etc), and reduce the
|
||||
* number of resizes, which are not especially fast because they
|
||||
* require atomic transfers. Currently, arrays are initialized to
|
||||
* be fairly small. (Maintenance note: any changes in fields,
|
||||
* queues, or their uses, or JVM layout policies, must be
|
||||
* accompanied by re-evaluation of these placement and sizing
|
||||
* decisions.)
|
||||
@ -1035,7 +1031,8 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
|
||||
// source history window packing used in scan() and runWorker()
|
||||
static final long RESCAN = 1L << 63; // must be negative
|
||||
static final long WMASK = ~(((long)SMASK) << 48); // id bits only
|
||||
static final long HMASK = ((((long)SMASK) << 32) |
|
||||
(((long)SMASK) << 16)); // history bits
|
||||
static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd
|
||||
(((long)INVALID_ID) << 16)); // no 2nd
|
||||
|
||||
@ -1210,7 +1207,6 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
static final class WorkQueue {
|
||||
// fields declared in order of their likely layout on most VMs
|
||||
final ForkJoinWorkerThread owner; // null if shared
|
||||
volatile Thread parker; // set when parking in awaitWork
|
||||
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
|
||||
int base; // index of next slot for poll
|
||||
final int config; // mode bits
|
||||
@ -1226,6 +1222,8 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
volatile int source; // source queue id (or DEREGISTERED)
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
int nsteals; // number of steals from other queues
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
volatile int parking; // nonzero if parked in awaitWork
|
||||
|
||||
// Support for atomic operations
|
||||
private static final Unsafe U;
|
||||
@ -1295,7 +1293,7 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
*/
|
||||
final void push(ForkJoinTask<?> task, ForkJoinPool pool,
|
||||
boolean internal) {
|
||||
int s = top, b = base, cap, m, room; ForkJoinTask<?>[] a;
|
||||
int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask<?>[] a;
|
||||
if ((a = array) == null || (cap = a.length) <= 0 ||
|
||||
(room = (m = cap - 1) - (s - b)) < 0) { // could not resize
|
||||
if (!internal)
|
||||
@ -1303,36 +1301,34 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
throw new RejectedExecutionException("Queue capacity exceeded");
|
||||
}
|
||||
top = s + 1;
|
||||
long pos = slotOffset(m & s);
|
||||
long pos = slotOffset(p = m & s);
|
||||
if (!internal)
|
||||
U.putReference(a, pos, task); // inside lock
|
||||
else
|
||||
U.getAndSetReference(a, pos, task); // fully fenced
|
||||
if (room == 0) { // resize for next time
|
||||
int newCap; // rapidly grow until large
|
||||
if ((newCap = (cap < 1 << 24) ? cap << 2 : cap << 1) > 0) {
|
||||
ForkJoinTask<?>[] newArray = null;
|
||||
try {
|
||||
newArray = new ForkJoinTask<?>[newCap];
|
||||
} catch (OutOfMemoryError ex) {
|
||||
}
|
||||
if (newArray != null) { // else throw on next push
|
||||
int newMask = newCap - 1; // poll old, push to new
|
||||
for (int k = s, j = cap; j > 0; --j, --k) {
|
||||
if ((newArray[k & newMask] =
|
||||
(ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset(k & m), null)) == null)
|
||||
break; // lost to pollers
|
||||
}
|
||||
updateArray(newArray); // fully fenced
|
||||
}
|
||||
if (room == 0 && (newCap = cap << 1) > 0) {
|
||||
ForkJoinTask<?>[] newArray = null;
|
||||
try { // resize for next time
|
||||
newArray = new ForkJoinTask<?>[newCap];
|
||||
} catch (OutOfMemoryError ex) {
|
||||
}
|
||||
if (newArray != null) { // else throw on next push
|
||||
int newMask = newCap - 1; // poll old, push to new
|
||||
for (int k = s, j = cap; j > 0; --j, --k) {
|
||||
ForkJoinTask<?> u;
|
||||
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset(k & m), null)) == null)
|
||||
break; // lost to pollers
|
||||
newArray[k & newMask] = u;
|
||||
}
|
||||
updateArray(newArray); // fully fenced
|
||||
}
|
||||
a = null; // always signal
|
||||
}
|
||||
if (!internal)
|
||||
unlockPhase();
|
||||
if ((room == 0 || room >= m || a[m & (s - 1)] == null) &&
|
||||
pool != null)
|
||||
pool.signalWork();
|
||||
if ((a == null || a[m & (s - 1)] == null) && pool != null)
|
||||
pool.signalWork(a, p);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1439,7 +1435,7 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
a, slotOffset(k), t, null))) {
|
||||
updateBase(nb);
|
||||
if (a[nk] != null && pool != null)
|
||||
pool.signalWork(); // propagate
|
||||
pool.signalWork(a, nk); // propagate
|
||||
return t;
|
||||
}
|
||||
if (o == null && a[nk] == null && array == a &&
|
||||
@ -1456,14 +1452,23 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
* to repoll from the queue obtained from pool.scan.
|
||||
*/
|
||||
private ForkJoinTask<?> tryPoll() {
|
||||
ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, cap, k;
|
||||
if ((a = array) != null && (cap = a.length) > 0 &&
|
||||
(t = a[k = (b = base) & (cap - 1)]) != null) {
|
||||
U.loadFence();
|
||||
if (base == b &&
|
||||
U.compareAndSetReference(a, slotOffset(k), t, null)) {
|
||||
updateBase(b + 1);
|
||||
return t;
|
||||
ForkJoinTask<?>[] a; int cap;
|
||||
if ((a = array) != null && (cap = a.length) > 0) {
|
||||
for (int b = base, k;;) { // loop only if inconsistent
|
||||
ForkJoinTask<?> t = a[k = b & (cap - 1)];
|
||||
U.loadFence();
|
||||
if (b == (b = base)) {
|
||||
Object o;
|
||||
if (t == null)
|
||||
o = a[k];
|
||||
else if (t == (o = U.compareAndExchangeReference(
|
||||
a, slotOffset(k), t, null))) {
|
||||
updateBase(b + 1);
|
||||
return t;
|
||||
}
|
||||
if (o == null)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
@ -1676,9 +1681,11 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
final long config; // static configuration bits
|
||||
volatile long stealCount; // collects worker nsteals
|
||||
volatile long threadIds; // for worker thread names
|
||||
volatile long ctl; // main pool control
|
||||
int parallelism; // target number of workers
|
||||
volatile int runState; // versioned, lockable
|
||||
@jdk.internal.vm.annotation.Contended("fjpctl") // segregate
|
||||
volatile long ctl; // main pool control
|
||||
@jdk.internal.vm.annotation.Contended("fjpctl") // colocate
|
||||
int parallelism; // target number of workers
|
||||
|
||||
// Support for atomic operations
|
||||
private static final Unsafe U;
|
||||
@ -1876,8 +1883,8 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
c, ((RC_MASK & (c - RC_UNIT)) |
|
||||
(TC_MASK & (c - TC_UNIT)) |
|
||||
(LMASK & c)))));
|
||||
else if ((int)c == 0) // was dropped on timeout
|
||||
replaceable = false;
|
||||
else if ((int)c != 0)
|
||||
replaceable = true; // signal below to cascade timeouts
|
||||
if (w != null) { // cancel remaining tasks
|
||||
for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
|
||||
try {
|
||||
@ -1898,15 +1905,19 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
unlockRunState();
|
||||
}
|
||||
if ((runState & STOP) == 0 && replaceable)
|
||||
signalWork(); // may replace unless trimmed or uninitialized
|
||||
signalWork(null, 0); // may replace unless trimmed or uninitialized
|
||||
if (ex != null)
|
||||
ForkJoinTask.rethrow(ex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases an idle worker, or creates one if not enough exist.
|
||||
* Releases an idle worker, or creates one if not enough exist,
|
||||
* returning on contention if a signal task is already taken.
|
||||
*
|
||||
* @param a if nonnull, a task array holding task signalled
|
||||
* @param k index of task in array
|
||||
*/
|
||||
final void signalWork() {
|
||||
final void signalWork(ForkJoinTask<?>[] a, int k) {
|
||||
int pc = parallelism;
|
||||
for (long c = ctl;;) {
|
||||
WorkQueue[] qs = queues;
|
||||
@ -1928,20 +1939,21 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
if (v == null)
|
||||
createWorker();
|
||||
else {
|
||||
Thread t;
|
||||
v.phase = sp;
|
||||
if ((t = v.parker) != null)
|
||||
U.unpark(t);
|
||||
if (v.parking != 0)
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (a != null && k >= 0 && k < a.length && a[k] == null)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reactivates the given worker, and possibly interrupts others if
|
||||
* not top of ctl stack. Called only during shutdown to ensure release
|
||||
* on termination.
|
||||
* Reactivates the given worker, and possibly others if not top of
|
||||
* ctl stack. Called only during shutdown to ensure release on
|
||||
* termination.
|
||||
*/
|
||||
private void reactivate(WorkQueue w) {
|
||||
for (long c = ctl;;) {
|
||||
@ -1953,16 +1965,11 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
if (c == (c = compareAndExchangeCtl(
|
||||
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
|
||||
(v.stackPred & LMASK))))) {
|
||||
Thread t;
|
||||
v.phase = sp;
|
||||
if ((t = v.parker) != null) {
|
||||
try {
|
||||
t.interrupt();
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
}
|
||||
if (v == w)
|
||||
break;
|
||||
if (v.parking != 0)
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1993,18 +2000,18 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
sum += (p & 0xffffffffL) | ((long)b << 32);
|
||||
if ((p & IDLE) == 0 || s - b > 0) {
|
||||
if ((i & 1) == 0 && compareAndSetCtl(c, c))
|
||||
signalWork(); // ensure live
|
||||
signalWork(null, 0); // ensure live
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
swept = (phaseSum == (phaseSum = sum));
|
||||
}
|
||||
else if (compareAndSetCtl(c, c) && // confirm
|
||||
casRunState(e, (e & SHUTDOWN) != 0 ? e | STOP : e)) {
|
||||
if ((e & SHUTDOWN) != 0) // enable termination
|
||||
interruptAll();
|
||||
else if ((e & SHUTDOWN) == 0)
|
||||
return true;
|
||||
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
|
||||
interruptAll(); // confirmed
|
||||
return true; // enable termination
|
||||
}
|
||||
else
|
||||
break; // restart
|
||||
@ -2021,17 +2028,12 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
final void runWorker(WorkQueue w) {
|
||||
if (w != null) {
|
||||
int phase = w.phase, r = w.stackPred; // seed from registerWorker
|
||||
for (long window = NO_HISTORY | (r >>> 16);;) {
|
||||
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
|
||||
if ((runState & STOP) != 0) // terminating
|
||||
break;
|
||||
if (window == (window = scan(w, window & WMASK, r)) &&
|
||||
window >= 0L && phase != (phase = awaitWork(w, phase))) {
|
||||
if ((phase & IDLE) != 0)
|
||||
break; // worker exit
|
||||
window = NO_HISTORY | (window & SMASK); // clear history
|
||||
}
|
||||
}
|
||||
long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
|
||||
do {
|
||||
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
|
||||
} while ((runState & STOP) == 0 &&
|
||||
(((window = scan(w, window, r)) < 0L ||
|
||||
((phase = awaitWork(w, phase)) & IDLE) == 0)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -2048,105 +2050,116 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
private long scan(WorkQueue w, long window, int r) {
|
||||
WorkQueue[] qs = queues;
|
||||
int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
|
||||
long next = window & ~RESCAN;
|
||||
outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
|
||||
int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
|
||||
if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
|
||||
(a = q.array) != null && (cap = a.length) > 0) {
|
||||
for (;;) {
|
||||
int b, k; Object o;
|
||||
ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
|
||||
for (int b = q.base;;) {
|
||||
int nb = b + 1, nk = nb & (cap - 1), k;
|
||||
ForkJoinTask<?> t = a[k = b & (cap - 1)];
|
||||
U.loadFence(); // re-read b and t
|
||||
if (q.base == b) { // else inconsistent; retry
|
||||
int nb = b + 1, nk = nb & (cap - 1);
|
||||
if (t == null) {
|
||||
if (a[k] == null) { // revisit if another task
|
||||
if (window >= 0L && a[nk] != null)
|
||||
window |= RESCAN;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (b == (b = q.base)) { // else inconsistent; retry
|
||||
Object o;
|
||||
if (t == null)
|
||||
o = a[k];
|
||||
else if (t == (o = U.compareAndExchangeReference(
|
||||
a, slotOffset(k), t, null))) {
|
||||
a, slotOffset(k), t, null))) {
|
||||
q.updateBase(nb);
|
||||
long pw = window, nw = ((pw << 16) | j) & WMASK;
|
||||
window = nw | RESCAN;
|
||||
if ((nw != pw || (short)(nw >>> 32) != j) &&
|
||||
a[nk] != null)
|
||||
signalWork(); // limit propagation
|
||||
next = RESCAN | ((window << 16) & HMASK) | j;
|
||||
if (window != next && a[nk] != null)
|
||||
signalWork(a, nk); // limit propagation
|
||||
if (w != null) // always true
|
||||
w.topLevelExec(t, q, j);
|
||||
break outer;
|
||||
}
|
||||
else if (o == null) // contended
|
||||
break; // retried unless newly active
|
||||
if (o == null) {
|
||||
if (next >= 0L && a[nk] != null)
|
||||
next |= RESCAN;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return window;
|
||||
return next;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to inactivate, and if successful, awaits signal or termination.
|
||||
*
|
||||
* @param w the worker (may be null if already terminated)
|
||||
* @param p current phase
|
||||
* @param phase current phase
|
||||
* @return current phase, with IDLE set if worker should exit
|
||||
*/
|
||||
private int awaitWork(WorkQueue w, int p) {
|
||||
private int awaitWork(WorkQueue w, int phase) {
|
||||
boolean quiet; // true if possibly quiescent
|
||||
int active = phase + (IDLE << 1), p = phase | IDLE, e;
|
||||
if (w != null) {
|
||||
int idlePhase = p + IDLE, nextPhase = p + (IDLE << 1);
|
||||
long pc = ctl, qc = (nextPhase & LMASK) | ((pc - RC_UNIT) & UMASK);
|
||||
w.stackPred = (int)pc; // set ctl stack link
|
||||
w.phase = idlePhase; // try to inactivate
|
||||
if (!compareAndSetCtl(pc, qc)) // contended enque
|
||||
return w.phase = p; // back out
|
||||
int ac = (short)(qc >>> RC_SHIFT);
|
||||
boolean quiescent = (ac <= 0 && quiescent());
|
||||
if ((runState & STOP) != 0)
|
||||
return idlePhase;
|
||||
int spins = ac + ((((int)(qc >>> TC_SHIFT)) & SMASK) << 1);
|
||||
while ((p = w.phase) == idlePhase && --spins > 0)
|
||||
Thread.onSpinWait(); // spin for approx #accesses to signal
|
||||
if (p == idlePhase) {
|
||||
long deadline = (!quiescent ? 0L : // timeout for trim
|
||||
System.currentTimeMillis() + keepAlive);
|
||||
WorkQueue[] qs = queues;
|
||||
int n = (qs == null) ? 0 : qs.length;
|
||||
for (int i = 0; i < n; ++i) { // recheck queues
|
||||
WorkQueue q; ForkJoinTask<?>[] a; int cap;
|
||||
if ((q = qs[i]) != null &&
|
||||
(a = q.array) != null && (cap = a.length) > 0 &&
|
||||
a[q.base & (cap - 1)] != null &&
|
||||
ctl == qc && compareAndSetCtl(qc, pc)) {
|
||||
w.phase = (int)qc; // release
|
||||
break;
|
||||
}
|
||||
w.phase = p; // deactivate
|
||||
long np = active & LMASK, pc = ctl; // try to enqueue
|
||||
long qc = np | ((pc - RC_UNIT) & UMASK);
|
||||
w.stackPred = (int)pc; // set ctl stack link
|
||||
if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
|
||||
qc = np | ((pc - RC_UNIT) & UMASK);
|
||||
w.stackPred = (int)pc; // retry once
|
||||
if (pc != (pc = compareAndExchangeCtl(pc, qc)))
|
||||
p = w.phase = phase; // back out
|
||||
}
|
||||
if (p != phase && ((e = runState) & STOP) == 0 &&
|
||||
(!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
|
||||
!(quiet = quiescent()) || (runState & STOP) == 0)) {
|
||||
long deadline = 0L; // not terminating
|
||||
if (quiet) { // use timeout if trimmable
|
||||
int nt = (short)(qc >>> TC_SHIFT);
|
||||
long delay = keepAlive; // scale if not at target
|
||||
if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
|
||||
delay = Math.max(TIMEOUT_SLOP, delay / nt);
|
||||
if ((deadline = delay + System.currentTimeMillis()) == 0L)
|
||||
deadline = 1L; // avoid zero
|
||||
}
|
||||
if ((p = w.phase) == idlePhase) { // emulate LockSupport.park
|
||||
boolean release = quiet;
|
||||
WorkQueue[] qs = queues; // recheck queues
|
||||
int n = (qs == null) ? 0 : qs.length;
|
||||
for (int l = -n, j = active; l < n; ++l, ++j) {
|
||||
WorkQueue q; ForkJoinTask<?>[] a; int cap;
|
||||
if ((p = w.phase) == active) // interleave signal checks
|
||||
break;
|
||||
if ((q = qs[j & (n - 1)]) != null &&
|
||||
(a = q.array) != null && (cap = a.length) > 0 &&
|
||||
a[q.base & (cap - 1)] != null) {
|
||||
if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
|
||||
p = w.phase = active;
|
||||
break; // possible missed signal
|
||||
}
|
||||
release = true; // track multiple or reencounter
|
||||
}
|
||||
Thread.onSpinWait(); // reduce memory traffic
|
||||
}
|
||||
if (p != active) { // emulate LockSupport.park
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
w.parker = Thread.currentThread();
|
||||
w.parking = 1;
|
||||
for (;;) {
|
||||
if ((runState & STOP) != 0 || (p = w.phase) != idlePhase)
|
||||
if ((runState & STOP) != 0 || (p = w.phase) == active)
|
||||
break;
|
||||
U.park(quiescent, deadline);
|
||||
if ((p = w.phase) != idlePhase || (runState & STOP) != 0)
|
||||
U.park(deadline != 0L, deadline);
|
||||
if ((p = w.phase) == active || (runState & STOP) != 0)
|
||||
break;
|
||||
Thread.interrupted(); // clear for next park
|
||||
if (quiescent && TIMEOUT_SLOP >
|
||||
Thread.interrupted(); // clear for next park
|
||||
if (deadline != 0L && TIMEOUT_SLOP >
|
||||
deadline - System.currentTimeMillis()) {
|
||||
long sp = w.stackPred & LMASK;
|
||||
long c = ctl, nc = sp | (UMASK & (c - TC_UNIT));
|
||||
if (((int)c & SMASK) == (idlePhase & SMASK) &&
|
||||
long sp = w.stackPred & LMASK, c = ctl;
|
||||
long nc = sp | (UMASK & (c - TC_UNIT));
|
||||
if (((int)c & SMASK) == (active & SMASK) &&
|
||||
compareAndSetCtl(c, nc)) {
|
||||
w.source = DEREGISTERED;
|
||||
w.phase = (int)c;
|
||||
break;
|
||||
w.phase = active;
|
||||
break; // trimmed on timeout
|
||||
}
|
||||
deadline += keepAlive; // not head; reset timer
|
||||
deadline = 0L; // no longer trimmable
|
||||
}
|
||||
}
|
||||
w.parker = null;
|
||||
w.parking = 0;
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
}
|
||||
}
|
||||
@ -2201,13 +2214,13 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
sp = (int)c,
|
||||
stat = -1; // default retry return
|
||||
if (sp != 0 && active <= pc) { // activate idle worker
|
||||
WorkQueue[] qs; WorkQueue v; int i; Thread t;
|
||||
WorkQueue[] qs; WorkQueue v; int i;
|
||||
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
|
||||
(v = qs[i]) != null &&
|
||||
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
|
||||
v.phase = sp;
|
||||
if ((t = v.parker) != null)
|
||||
U.unpark(t);
|
||||
if (v.parking != 0)
|
||||
U.unpark(v.owner);
|
||||
stat = UNCOMPENSATE;
|
||||
}
|
||||
}
|
||||
@ -2807,7 +2820,6 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns termination signal, constructing if necessary
|
||||
*/
|
||||
@ -3099,7 +3111,13 @@ public class ForkJoinPool extends AbstractExecutorService {
|
||||
public <T> T invoke(ForkJoinTask<T> task) {
|
||||
Objects.requireNonNull(task);
|
||||
poolSubmit(true, task);
|
||||
return task.join();
|
||||
try {
|
||||
return task.join();
|
||||
} catch (RuntimeException | Error unchecked) {
|
||||
throw unchecked;
|
||||
} catch (Exception checked) {
|
||||
throw new RuntimeException(checked);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -626,6 +626,22 @@ public class ForkJoinPoolTest extends JSR166TestCase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* invoke throws a RuntimeException if task throws unchecked exception
|
||||
*/
|
||||
public void testInvokeUncheckedException() throws Throwable {
|
||||
ForkJoinPool p = new ForkJoinPool(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.invoke(ForkJoinTask.adapt(new Callable<Object>() {
|
||||
public Object call() { throw new ArithmeticException(); }}));
|
||||
shouldThrow();
|
||||
} catch (RuntimeException success) {
|
||||
assertTrue(success.getCause() instanceof ArithmeticException);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAny(null) throws NullPointerException
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user