From cc25d8b12bbab9dde9ade7762927dcb8d27e23c5 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Wed, 6 Dec 2023 16:12:59 +0000
Subject: [PATCH] 8319662: ForkJoinPool trims worker threads too slowly
8319498: ForkJoinPool.invoke(ForkJoinTask) does not specify behavior when
task throws checked exception
Reviewed-by: alanb
---
.../java/util/concurrent/ForkJoinPool.java | 374 +++++++++---------
.../util/concurrent/tck/ForkJoinPoolTest.java | 16 +
2 files changed, 212 insertions(+), 178 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index a14f6d884e0..4cdafb0a44f 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -556,14 +556,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* signalling (because its queue is empty), also resulting in
* logarithmic full activation time
*
- * * Because we don't know about usage patterns (or most commonly,
+ * * Because we don't know about usage patterns (or most commonly,
* mixtures), we use both approaches, which present even more
* opportunities to over-signal. Note that in either of these
* contexts, signals may be (and often are) unnecessary because
* active workers continue scanning after running tasks without
* the need to be signalled (which is one reason work stealing
* is often faster than alternatives), so additional workers
- * aren't needed. But there is no efficient way to detect this.
+ * aren't needed. We filter out some of these cases by exiting
+ * retry loops in signalWork if the task responsible for the
+ * signal has already been taken.
*
* * For rapidly branching tasks that require full pool resources,
* oversignalling is OK, because signalWork will soon have no
@@ -579,9 +581,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* top-level polls (argument "window" in method scan, with setup
* in method runWorker) as an encoded sliding window of current
* and previous two sources (or INVALID_ID if none), and stop
- * signalling when all were from the same source. Also, retries
- * are suppressed on CAS failures by newly activated workers,
- * which serves as a form of admission control. These
+ * signalling when all were from the same source. These
* mechanisms may result in transiently too few workers, but
* once workers poll from a new source, they rapidly reactivate
* others.
@@ -609,22 +609,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* contention.
*
* Deactivation. When method scan indicates that no tasks are
- * found by a worker, it tries to deactivate (in runWorker). Note
- * that not finding tasks doesn't mean that there won't soon be
- * some. Further, a scan may give up under contention, returning
- * even without knowing whether any tasks are still present, which
- * is OK given, a secondary check (in awaitWork) needed to cover
- * deactivation/signal races. Blocking and unblocking via
- * park/unpark can cause serious slowdowns when tasks are rapidly
- * but irregularly generated (which is often due to garbage
- * collectors and other activities). One way to ameliorate is for
- * workers to rescan multiple times, even when there are unlikely
- * to be tasks. But this causes enough memory traffic and CAS
- * contention to prefer using quieter short spinwaits in awaitWork
- * and elsewhere. Those in awaitWork are set to small values that
- * only cover near-miss scenarios for inactivate/activate races.
- * Because idle workers are often not yet blocked (parked), we use
- * the WorkQueue parker field to advertise that a waiter actually
+ * found by a worker, it tries to deactivate (in awaitWork),
+ * giving up (and rescanning) on ctl contention. To avoid missed
+ * signals during deactivation, the method rescans and reactivates
+ * if there may have been a missed signal during deactivation,
+ * filtering out most cases in which this is unnecessary. Because
+ * idle workers are often not yet blocked (parked), we use the
+ * WorkQueue parking field to advertise that a waiter actually
* needs unparking upon signal.
*
* Quiescence. Workers scan looking for work, giving up when they
@@ -676,7 +667,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate if the pool has remained quiescent for
- * period given by field keepAlive.
+ * period given by field keepAlive (default 60sec), which applies
+ * to the first timeout of a fully populated pool. Subsequent (or
+ * other) cases use delays such that, if still quiescent, all will
+ * be released before one additional keepAlive unit elapses.
*
* Joining Tasks
* =============
@@ -687,7 +681,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* task would otherwise be blocked waiting for completion of
* another, basically, just by running that task or one of its
* subtasks if not already taken. These mechanics are disabled for
- * InterruptibleTasks, that guarantee that callers do not executed
+ * InterruptibleTasks, that guarantee that callers do not execute
* submitted tasks.
*
* The basic structure of joining is an extended spin/block scheme
@@ -886,22 +880,24 @@ public class ForkJoinPool extends AbstractExecutorService {
* control (for good reason). Similarly for relying on fields
* being placed in size-sorted declaration order.
*
- * For class ForkJoinPool, it is usually more effective to order
- * fields such that the most commonly accessed fields are unlikely
- * to share cache lines with adjacent objects under JVM layout
- * rules. For class WorkQueue, an embedded @Contended region
- * segregates fields most heavily updated by owners from those
- * most commonly read by stealers or other management. Initial
- * sizing and resizing of WorkQueue arrays is an even more
- * delicate tradeoff because the best strategy systematically
- * varies across garbage collectors. Small arrays are better for
- * locality and reduce GC scan time, but large arrays reduce both
- * direct false-sharing and indirect cases due to GC bookkeeping
- * (cardmarks etc), and reduce the number of resizes, which are
- * not especially fast because they require atomic transfers.
- * Currently, arrays are initialized to be fairly small but early
- * resizes rapidly increase size by more than a factor of two
- * until very large. (Maintenance note: any changes in fields,
+ * We isolate the ForkJoinPool.ctl field that otherwise causes the
+ * most false-sharing misses with respect to other fields. Also,
+ * ForkJoinPool fields are ordered such that fields less prone to
+ * contention effects are first, offsetting those that otherwise
+ * would be, while also reducing total footprint vs using
+ * multiple @Contended regions, which tends to slow down
+ * less-contended applications. For class WorkQueue, an
+ * embedded @Contended region segregates fields most heavily
+ * updated by owners from those most commonly read by stealers or
+ * other management. Initial sizing and resizing of WorkQueue
+ * arrays is an even more delicate tradeoff because the best
+ * strategy systematically varies across garbage collectors. Small
+ * arrays are better for locality and reduce GC scan time, but
+ * large arrays reduce both direct false-sharing and indirect
+ * cases due to GC bookkeeping (cardmarks etc), and reduce the
+ * number of resizes, which are not especially fast because they
+ * require atomic transfers. Currently, arrays are initialized to
+ * be fairly small. (Maintenance note: any changes in fields,
* queues, or their uses, or JVM layout policies, must be
* accompanied by re-evaluation of these placement and sizing
* decisions.)
@@ -1035,7 +1031,8 @@ public class ForkJoinPool extends AbstractExecutorService {
// source history window packing used in scan() and runWorker()
static final long RESCAN = 1L << 63; // must be negative
- static final long WMASK = ~(((long)SMASK) << 48); // id bits only
+ static final long HMASK = ((((long)SMASK) << 32) |
+ (((long)SMASK) << 16)); // history bits
static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd
(((long)INVALID_ID) << 16)); // no 2nd
@@ -1210,7 +1207,6 @@ public class ForkJoinPool extends AbstractExecutorService {
static final class WorkQueue {
// fields declared in order of their likely layout on most VMs
final ForkJoinWorkerThread owner; // null if shared
- volatile Thread parker; // set when parking in awaitWork
ForkJoinTask>[] array; // the queued tasks; power of 2 size
int base; // index of next slot for poll
final int config; // mode bits
@@ -1226,6 +1222,8 @@ public class ForkJoinPool extends AbstractExecutorService {
volatile int source; // source queue id (or DEREGISTERED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
+ @jdk.internal.vm.annotation.Contended("w")
+ volatile int parking; // nonzero if parked in awaitWork
// Support for atomic operations
private static final Unsafe U;
@@ -1295,7 +1293,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void push(ForkJoinTask> task, ForkJoinPool pool,
boolean internal) {
- int s = top, b = base, cap, m, room; ForkJoinTask>[] a;
+ int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask>[] a;
if ((a = array) == null || (cap = a.length) <= 0 ||
(room = (m = cap - 1) - (s - b)) < 0) { // could not resize
if (!internal)
@@ -1303,36 +1301,34 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new RejectedExecutionException("Queue capacity exceeded");
}
top = s + 1;
- long pos = slotOffset(m & s);
+ long pos = slotOffset(p = m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
- if (room == 0) { // resize for next time
- int newCap; // rapidly grow until large
- if ((newCap = (cap < 1 << 24) ? cap << 2 : cap << 1) > 0) {
- ForkJoinTask>[] newArray = null;
- try {
- newArray = new ForkJoinTask>[newCap];
- } catch (OutOfMemoryError ex) {
- }
- if (newArray != null) { // else throw on next push
- int newMask = newCap - 1; // poll old, push to new
- for (int k = s, j = cap; j > 0; --j, --k) {
- if ((newArray[k & newMask] =
- (ForkJoinTask>)U.getAndSetReference(
- a, slotOffset(k & m), null)) == null)
- break; // lost to pollers
- }
- updateArray(newArray); // fully fenced
- }
+ if (room == 0 && (newCap = cap << 1) > 0) {
+ ForkJoinTask>[] newArray = null;
+ try { // resize for next time
+ newArray = new ForkJoinTask>[newCap];
+ } catch (OutOfMemoryError ex) {
}
+ if (newArray != null) { // else throw on next push
+ int newMask = newCap - 1; // poll old, push to new
+ for (int k = s, j = cap; j > 0; --j, --k) {
+ ForkJoinTask> u;
+ if ((u = (ForkJoinTask>)U.getAndSetReference(
+ a, slotOffset(k & m), null)) == null)
+ break; // lost to pollers
+ newArray[k & newMask] = u;
+ }
+ updateArray(newArray); // fully fenced
+ }
+ a = null; // always signal
}
if (!internal)
unlockPhase();
- if ((room == 0 || room >= m || a[m & (s - 1)] == null) &&
- pool != null)
- pool.signalWork();
+ if ((a == null || a[m & (s - 1)] == null) && pool != null)
+ pool.signalWork(a, p);
}
/**
@@ -1439,7 +1435,7 @@ public class ForkJoinPool extends AbstractExecutorService {
a, slotOffset(k), t, null))) {
updateBase(nb);
if (a[nk] != null && pool != null)
- pool.signalWork(); // propagate
+ pool.signalWork(a, nk); // propagate
return t;
}
if (o == null && a[nk] == null && array == a &&
@@ -1456,14 +1452,23 @@ public class ForkJoinPool extends AbstractExecutorService {
* to repoll from the queue obtained from pool.scan.
*/
private ForkJoinTask> tryPoll() {
- ForkJoinTask> t; ForkJoinTask>[] a; int b, cap, k;
- if ((a = array) != null && (cap = a.length) > 0 &&
- (t = a[k = (b = base) & (cap - 1)]) != null) {
- U.loadFence();
- if (base == b &&
- U.compareAndSetReference(a, slotOffset(k), t, null)) {
- updateBase(b + 1);
- return t;
+ ForkJoinTask>[] a; int cap;
+ if ((a = array) != null && (cap = a.length) > 0) {
+ for (int b = base, k;;) { // loop only if inconsistent
+ ForkJoinTask> t = a[k = b & (cap - 1)];
+ U.loadFence();
+ if (b == (b = base)) {
+ Object o;
+ if (t == null)
+ o = a[k];
+ else if (t == (o = U.compareAndExchangeReference(
+ a, slotOffset(k), t, null))) {
+ updateBase(b + 1);
+ return t;
+ }
+ if (o == null)
+ break;
+ }
}
}
return null;
@@ -1676,9 +1681,11 @@ public class ForkJoinPool extends AbstractExecutorService {
final long config; // static configuration bits
volatile long stealCount; // collects worker nsteals
volatile long threadIds; // for worker thread names
- volatile long ctl; // main pool control
- int parallelism; // target number of workers
volatile int runState; // versioned, lockable
+ @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
+ volatile long ctl; // main pool control
+ @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
+ int parallelism; // target number of workers
// Support for atomic operations
private static final Unsafe U;
@@ -1876,8 +1883,8 @@ public class ForkJoinPool extends AbstractExecutorService {
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
- else if ((int)c == 0) // was dropped on timeout
- replaceable = false;
+ else if ((int)c != 0)
+ replaceable = true; // signal below to cascade timeouts
if (w != null) { // cancel remaining tasks
for (ForkJoinTask> t; (t = w.nextLocalTask()) != null; ) {
try {
@@ -1898,15 +1905,19 @@ public class ForkJoinPool extends AbstractExecutorService {
unlockRunState();
}
if ((runState & STOP) == 0 && replaceable)
- signalWork(); // may replace unless trimmed or uninitialized
+ signalWork(null, 0); // may replace unless trimmed or uninitialized
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
- * Releases an idle worker, or creates one if not enough exist.
+ * Releases an idle worker, or creates one if not enough exist,
+ * returning on contention if a signal task is already taken.
+ *
+ * @param a if nonnull, a task array holding task signalled
+ * @param k index of task in array
*/
- final void signalWork() {
+ final void signalWork(ForkJoinTask>[] a, int k) {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
@@ -1928,20 +1939,21 @@ public class ForkJoinPool extends AbstractExecutorService {
if (v == null)
createWorker();
else {
- Thread t;
v.phase = sp;
- if ((t = v.parker) != null)
- U.unpark(t);
+ if (v.parking != 0)
+ U.unpark(v.owner);
}
break;
}
+ if (a != null && k >= 0 && k < a.length && a[k] == null)
+ break;
}
}
/**
- * Reactivates the given worker, and possibly interrupts others if
- * not top of ctl stack. Called only during shutdown to ensure release
- * on termination.
+ * Reactivates the given worker, and possibly others if not top of
+ * ctl stack. Called only during shutdown to ensure release on
+ * termination.
*/
private void reactivate(WorkQueue w) {
for (long c = ctl;;) {
@@ -1953,16 +1965,11 @@ public class ForkJoinPool extends AbstractExecutorService {
if (c == (c = compareAndExchangeCtl(
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
- Thread t;
v.phase = sp;
- if ((t = v.parker) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
- }
- }
if (v == w)
break;
+ if (v.parking != 0)
+ U.unpark(v.owner);
}
}
}
@@ -1993,18 +2000,18 @@ public class ForkJoinPool extends AbstractExecutorService {
sum += (p & 0xffffffffL) | ((long)b << 32);
if ((p & IDLE) == 0 || s - b > 0) {
if ((i & 1) == 0 && compareAndSetCtl(c, c))
- signalWork(); // ensure live
+ signalWork(null, 0); // ensure live
return false;
}
}
}
swept = (phaseSum == (phaseSum = sum));
}
- else if (compareAndSetCtl(c, c) && // confirm
- casRunState(e, (e & SHUTDOWN) != 0 ? e | STOP : e)) {
- if ((e & SHUTDOWN) != 0) // enable termination
- interruptAll();
+ else if ((e & SHUTDOWN) == 0)
return true;
+ else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
+ interruptAll(); // confirmed
+ return true; // enable termination
}
else
break; // restart
@@ -2021,17 +2028,12 @@ public class ForkJoinPool extends AbstractExecutorService {
final void runWorker(WorkQueue w) {
if (w != null) {
int phase = w.phase, r = w.stackPred; // seed from registerWorker
- for (long window = NO_HISTORY | (r >>> 16);;) {
- r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
- if ((runState & STOP) != 0) // terminating
- break;
- if (window == (window = scan(w, window & WMASK, r)) &&
- window >= 0L && phase != (phase = awaitWork(w, phase))) {
- if ((phase & IDLE) != 0)
- break; // worker exit
- window = NO_HISTORY | (window & SMASK); // clear history
- }
- }
+ long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
+ do {
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
+ } while ((runState & STOP) == 0 &&
+ (((window = scan(w, window, r)) < 0L ||
+ ((phase = awaitWork(w, phase)) & IDLE) == 0)));
}
}
@@ -2048,105 +2050,116 @@ public class ForkJoinPool extends AbstractExecutorService {
private long scan(WorkQueue w, long window, int r) {
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
+ long next = window & ~RESCAN;
outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
int j, cap; WorkQueue q; ForkJoinTask>[] a;
if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
(a = q.array) != null && (cap = a.length) > 0) {
- for (;;) {
- int b, k; Object o;
- ForkJoinTask> t = a[k = (b = q.base) & (cap - 1)];
+ for (int b = q.base;;) {
+ int nb = b + 1, nk = nb & (cap - 1), k;
+ ForkJoinTask> t = a[k = b & (cap - 1)];
U.loadFence(); // re-read b and t
- if (q.base == b) { // else inconsistent; retry
- int nb = b + 1, nk = nb & (cap - 1);
- if (t == null) {
- if (a[k] == null) { // revisit if another task
- if (window >= 0L && a[nk] != null)
- window |= RESCAN;
- break;
- }
- }
+ if (b == (b = q.base)) { // else inconsistent; retry
+ Object o;
+ if (t == null)
+ o = a[k];
else if (t == (o = U.compareAndExchangeReference(
- a, slotOffset(k), t, null))) {
+ a, slotOffset(k), t, null))) {
q.updateBase(nb);
- long pw = window, nw = ((pw << 16) | j) & WMASK;
- window = nw | RESCAN;
- if ((nw != pw || (short)(nw >>> 32) != j) &&
- a[nk] != null)
- signalWork(); // limit propagation
+ next = RESCAN | ((window << 16) & HMASK) | j;
+ if (window != next && a[nk] != null)
+ signalWork(a, nk); // limit propagation
if (w != null) // always true
w.topLevelExec(t, q, j);
break outer;
}
- else if (o == null) // contended
- break; // retried unless newly active
+ if (o == null) {
+ if (next >= 0L && a[nk] != null)
+ next |= RESCAN;
+ break;
+ }
}
}
}
}
- return window;
+ return next;
}
/**
* Tries to inactivate, and if successful, awaits signal or termination.
*
* @param w the worker (may be null if already terminated)
- * @param p current phase
+ * @param phase current phase
* @return current phase, with IDLE set if worker should exit
*/
- private int awaitWork(WorkQueue w, int p) {
+ private int awaitWork(WorkQueue w, int phase) {
+ boolean quiet; // true if possibly quiescent
+ int active = phase + (IDLE << 1), p = phase | IDLE, e;
if (w != null) {
- int idlePhase = p + IDLE, nextPhase = p + (IDLE << 1);
- long pc = ctl, qc = (nextPhase & LMASK) | ((pc - RC_UNIT) & UMASK);
- w.stackPred = (int)pc; // set ctl stack link
- w.phase = idlePhase; // try to inactivate
- if (!compareAndSetCtl(pc, qc)) // contended enque
- return w.phase = p; // back out
- int ac = (short)(qc >>> RC_SHIFT);
- boolean quiescent = (ac <= 0 && quiescent());
- if ((runState & STOP) != 0)
- return idlePhase;
- int spins = ac + ((((int)(qc >>> TC_SHIFT)) & SMASK) << 1);
- while ((p = w.phase) == idlePhase && --spins > 0)
- Thread.onSpinWait(); // spin for approx #accesses to signal
- if (p == idlePhase) {
- long deadline = (!quiescent ? 0L : // timeout for trim
- System.currentTimeMillis() + keepAlive);
- WorkQueue[] qs = queues;
- int n = (qs == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) { // recheck queues
- WorkQueue q; ForkJoinTask>[] a; int cap;
- if ((q = qs[i]) != null &&
- (a = q.array) != null && (cap = a.length) > 0 &&
- a[q.base & (cap - 1)] != null &&
- ctl == qc && compareAndSetCtl(qc, pc)) {
- w.phase = (int)qc; // release
- break;
- }
+ w.phase = p; // deactivate
+ long np = active & LMASK, pc = ctl; // try to enqueue
+ long qc = np | ((pc - RC_UNIT) & UMASK);
+ w.stackPred = (int)pc; // set ctl stack link
+ if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
+ qc = np | ((pc - RC_UNIT) & UMASK);
+ w.stackPred = (int)pc; // retry once
+ if (pc != (pc = compareAndExchangeCtl(pc, qc)))
+ p = w.phase = phase; // back out
+ }
+ if (p != phase && ((e = runState) & STOP) == 0 &&
+ (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
+ !(quiet = quiescent()) || (runState & STOP) == 0)) {
+ long deadline = 0L; // not terminating
+ if (quiet) { // use timeout if trimmable
+ int nt = (short)(qc >>> TC_SHIFT);
+ long delay = keepAlive; // scale if not at target
+ if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
+ delay = Math.max(TIMEOUT_SLOP, delay / nt);
+ if ((deadline = delay + System.currentTimeMillis()) == 0L)
+ deadline = 1L; // avoid zero
}
- if ((p = w.phase) == idlePhase) { // emulate LockSupport.park
+ boolean release = quiet;
+ WorkQueue[] qs = queues; // recheck queues
+ int n = (qs == null) ? 0 : qs.length;
+ for (int l = -n, j = active; l < n; ++l, ++j) {
+ WorkQueue q; ForkJoinTask>[] a; int cap;
+ if ((p = w.phase) == active) // interleave signal checks
+ break;
+ if ((q = qs[j & (n - 1)]) != null &&
+ (a = q.array) != null && (cap = a.length) > 0 &&
+ a[q.base & (cap - 1)] != null) {
+ if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
+ p = w.phase = active;
+ break; // possible missed signal
+ }
+ release = true; // track multiple or reencounter
+ }
+ Thread.onSpinWait(); // reduce memory traffic
+ }
+ if (p != active) { // emulate LockSupport.park
LockSupport.setCurrentBlocker(this);
- w.parker = Thread.currentThread();
+ w.parking = 1;
for (;;) {
- if ((runState & STOP) != 0 || (p = w.phase) != idlePhase)
+ if ((runState & STOP) != 0 || (p = w.phase) == active)
break;
- U.park(quiescent, deadline);
- if ((p = w.phase) != idlePhase || (runState & STOP) != 0)
+ U.park(deadline != 0L, deadline);
+ if ((p = w.phase) == active || (runState & STOP) != 0)
break;
- Thread.interrupted(); // clear for next park
- if (quiescent && TIMEOUT_SLOP >
+ Thread.interrupted(); // clear for next park
+ if (deadline != 0L && TIMEOUT_SLOP >
deadline - System.currentTimeMillis()) {
- long sp = w.stackPred & LMASK;
- long c = ctl, nc = sp | (UMASK & (c - TC_UNIT));
- if (((int)c & SMASK) == (idlePhase & SMASK) &&
+ long sp = w.stackPred & LMASK, c = ctl;
+ long nc = sp | (UMASK & (c - TC_UNIT));
+ if (((int)c & SMASK) == (active & SMASK) &&
compareAndSetCtl(c, nc)) {
w.source = DEREGISTERED;
- w.phase = (int)c;
- break;
+ w.phase = active;
+ break; // trimmed on timeout
}
- deadline += keepAlive; // not head; reset timer
+ deadline = 0L; // no longer trimmable
}
}
- w.parker = null;
+ w.parking = 0;
LockSupport.setCurrentBlocker(null);
}
}
@@ -2201,13 +2214,13 @@ public class ForkJoinPool extends AbstractExecutorService {
sp = (int)c,
stat = -1; // default retry return
if (sp != 0 && active <= pc) { // activate idle worker
- WorkQueue[] qs; WorkQueue v; int i; Thread t;
+ WorkQueue[] qs; WorkQueue v; int i;
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
(v = qs[i]) != null &&
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
v.phase = sp;
- if ((t = v.parker) != null)
- U.unpark(t);
+ if (v.parking != 0)
+ U.unpark(v.owner);
stat = UNCOMPENSATE;
}
}
@@ -2807,7 +2820,6 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
-
/**
* Returns termination signal, constructing if necessary
*/
@@ -3099,7 +3111,13 @@ public class ForkJoinPool extends AbstractExecutorService {
public T invoke(ForkJoinTask task) {
Objects.requireNonNull(task);
poolSubmit(true, task);
- return task.join();
+ try {
+ return task.join();
+ } catch (RuntimeException | Error unchecked) {
+ throw unchecked;
+ } catch (Exception checked) {
+ throw new RuntimeException(checked);
+ }
}
/**
diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
index 781b7bffe6d..7db3e20a2fb 100644
--- a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
+++ b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
@@ -626,6 +626,22 @@ public class ForkJoinPoolTest extends JSR166TestCase {
}
}
+ /**
+ * invoke throws a RuntimeException if task throws unchecked exception
+ */
+ public void testInvokeUncheckedException() throws Throwable {
+ ForkJoinPool p = new ForkJoinPool(1);
+ try (PoolCleaner cleaner = cleaner(p)) {
+ try {
+ p.invoke(ForkJoinTask.adapt(new Callable