diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index a14f6d884e0..4cdafb0a44f 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -556,14 +556,16 @@ public class ForkJoinPool extends AbstractExecutorService { * signalling (because its queue is empty), also resulting in * logarithmic full activation time * - * * Because we don't know about usage patterns (or most commonly, + * * Because we don't know about usage patterns (or most commonly, * mixtures), we use both approaches, which present even more * opportunities to over-signal. Note that in either of these * contexts, signals may be (and often are) unnecessary because * active workers continue scanning after running tasks without * the need to be signalled (which is one reason work stealing * is often faster than alternatives), so additional workers - * aren't needed. But there is no efficient way to detect this. + * aren't needed. We filter out some of these cases by exiting + * retry loops in signalWork if the task responsible for the + * signal has already been taken. * * * For rapidly branching tasks that require full pool resources, * oversignalling is OK, because signalWork will soon have no @@ -579,9 +581,7 @@ public class ForkJoinPool extends AbstractExecutorService { * top-level polls (argument "window" in method scan, with setup * in method runWorker) as an encoded sliding window of current * and previous two sources (or INVALID_ID if none), and stop - * signalling when all were from the same source. Also, retries - * are suppressed on CAS failures by newly activated workers, - * which serves as a form of admission control. These + * signalling when all were from the same source. These * mechanisms may result in transiently too few workers, but * once workers poll from a new source, they rapidly reactivate * others. @@ -609,22 +609,13 @@ public class ForkJoinPool extends AbstractExecutorService { * contention. * * Deactivation. When method scan indicates that no tasks are - * found by a worker, it tries to deactivate (in runWorker). Note - * that not finding tasks doesn't mean that there won't soon be - * some. Further, a scan may give up under contention, returning - * even without knowing whether any tasks are still present, which - * is OK given, a secondary check (in awaitWork) needed to cover - * deactivation/signal races. Blocking and unblocking via - * park/unpark can cause serious slowdowns when tasks are rapidly - * but irregularly generated (which is often due to garbage - * collectors and other activities). One way to ameliorate is for - * workers to rescan multiple times, even when there are unlikely - * to be tasks. But this causes enough memory traffic and CAS - * contention to prefer using quieter short spinwaits in awaitWork - * and elsewhere. Those in awaitWork are set to small values that - * only cover near-miss scenarios for inactivate/activate races. - * Because idle workers are often not yet blocked (parked), we use - * the WorkQueue parker field to advertise that a waiter actually + * found by a worker, it tries to deactivate (in awaitWork), + * giving up (and rescanning) on ctl contention. To avoid missed + * signals during deactivation, the method rescans and reactivates + * if there may have been a missed signal during deactivation, + * filtering out most cases in which this is unnecessary. Because + * idle workers are often not yet blocked (parked), we use the + * WorkQueue parking field to advertise that a waiter actually * needs unparking upon signal. * * Quiescence. Workers scan looking for work, giving up when they @@ -676,7 +667,10 @@ public class ForkJoinPool extends AbstractExecutorService { * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will * time out and terminate if the pool has remained quiescent for - * period given by field keepAlive. + * period given by field keepAlive (default 60sec), which applies + * to the first timeout of a fully populated pool. Subsequent (or + * other) cases use delays such that, if still quiescent, all will + * be released before one additional keepAlive unit elapses. * * Joining Tasks * ============= @@ -687,7 +681,7 @@ public class ForkJoinPool extends AbstractExecutorService { * task would otherwise be blocked waiting for completion of * another, basically, just by running that task or one of its * subtasks if not already taken. These mechanics are disabled for - * InterruptibleTasks, that guarantee that callers do not executed + * InterruptibleTasks, that guarantee that callers do not execute * submitted tasks. * * The basic structure of joining is an extended spin/block scheme @@ -886,22 +880,24 @@ public class ForkJoinPool extends AbstractExecutorService { * control (for good reason). Similarly for relying on fields * being placed in size-sorted declaration order. * - * For class ForkJoinPool, it is usually more effective to order - * fields such that the most commonly accessed fields are unlikely - * to share cache lines with adjacent objects under JVM layout - * rules. For class WorkQueue, an embedded @Contended region - * segregates fields most heavily updated by owners from those - * most commonly read by stealers or other management. Initial - * sizing and resizing of WorkQueue arrays is an even more - * delicate tradeoff because the best strategy systematically - * varies across garbage collectors. Small arrays are better for - * locality and reduce GC scan time, but large arrays reduce both - * direct false-sharing and indirect cases due to GC bookkeeping - * (cardmarks etc), and reduce the number of resizes, which are - * not especially fast because they require atomic transfers. - * Currently, arrays are initialized to be fairly small but early - * resizes rapidly increase size by more than a factor of two - * until very large. (Maintenance note: any changes in fields, + * We isolate the ForkJoinPool.ctl field that otherwise causes the + * most false-sharing misses with respect to other fields. Also, + * ForkJoinPool fields are ordered such that fields less prone to + * contention effects are first, offsetting those that otherwise + * would be, while also reducing total footprint vs using + * multiple @Contended regions, which tends to slow down + * less-contended applications. For class WorkQueue, an + * embedded @Contended region segregates fields most heavily + * updated by owners from those most commonly read by stealers or + * other management. Initial sizing and resizing of WorkQueue + * arrays is an even more delicate tradeoff because the best + * strategy systematically varies across garbage collectors. Small + * arrays are better for locality and reduce GC scan time, but + * large arrays reduce both direct false-sharing and indirect + * cases due to GC bookkeeping (cardmarks etc), and reduce the + * number of resizes, which are not especially fast because they + * require atomic transfers. Currently, arrays are initialized to + * be fairly small. (Maintenance note: any changes in fields, * queues, or their uses, or JVM layout policies, must be * accompanied by re-evaluation of these placement and sizing * decisions.) @@ -1035,7 +1031,8 @@ public class ForkJoinPool extends AbstractExecutorService { // source history window packing used in scan() and runWorker() static final long RESCAN = 1L << 63; // must be negative - static final long WMASK = ~(((long)SMASK) << 48); // id bits only + static final long HMASK = ((((long)SMASK) << 32) | + (((long)SMASK) << 16)); // history bits static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd (((long)INVALID_ID) << 16)); // no 2nd @@ -1210,7 +1207,6 @@ public class ForkJoinPool extends AbstractExecutorService { static final class WorkQueue { // fields declared in order of their likely layout on most VMs final ForkJoinWorkerThread owner; // null if shared - volatile Thread parker; // set when parking in awaitWork ForkJoinTask[] array; // the queued tasks; power of 2 size int base; // index of next slot for poll final int config; // mode bits @@ -1226,6 +1222,8 @@ public class ForkJoinPool extends AbstractExecutorService { volatile int source; // source queue id (or DEREGISTERED) @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues + @jdk.internal.vm.annotation.Contended("w") + volatile int parking; // nonzero if parked in awaitWork // Support for atomic operations private static final Unsafe U; @@ -1295,7 +1293,7 @@ public class ForkJoinPool extends AbstractExecutorService { */ final void push(ForkJoinTask task, ForkJoinPool pool, boolean internal) { - int s = top, b = base, cap, m, room; ForkJoinTask[] a; + int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask[] a; if ((a = array) == null || (cap = a.length) <= 0 || (room = (m = cap - 1) - (s - b)) < 0) { // could not resize if (!internal) @@ -1303,36 +1301,34 @@ public class ForkJoinPool extends AbstractExecutorService { throw new RejectedExecutionException("Queue capacity exceeded"); } top = s + 1; - long pos = slotOffset(m & s); + long pos = slotOffset(p = m & s); if (!internal) U.putReference(a, pos, task); // inside lock else U.getAndSetReference(a, pos, task); // fully fenced - if (room == 0) { // resize for next time - int newCap; // rapidly grow until large - if ((newCap = (cap < 1 << 24) ? cap << 2 : cap << 1) > 0) { - ForkJoinTask[] newArray = null; - try { - newArray = new ForkJoinTask[newCap]; - } catch (OutOfMemoryError ex) { - } - if (newArray != null) { // else throw on next push - int newMask = newCap - 1; // poll old, push to new - for (int k = s, j = cap; j > 0; --j, --k) { - if ((newArray[k & newMask] = - (ForkJoinTask)U.getAndSetReference( - a, slotOffset(k & m), null)) == null) - break; // lost to pollers - } - updateArray(newArray); // fully fenced - } + if (room == 0 && (newCap = cap << 1) > 0) { + ForkJoinTask[] newArray = null; + try { // resize for next time + newArray = new ForkJoinTask[newCap]; + } catch (OutOfMemoryError ex) { } + if (newArray != null) { // else throw on next push + int newMask = newCap - 1; // poll old, push to new + for (int k = s, j = cap; j > 0; --j, --k) { + ForkJoinTask u; + if ((u = (ForkJoinTask)U.getAndSetReference( + a, slotOffset(k & m), null)) == null) + break; // lost to pollers + newArray[k & newMask] = u; + } + updateArray(newArray); // fully fenced + } + a = null; // always signal } if (!internal) unlockPhase(); - if ((room == 0 || room >= m || a[m & (s - 1)] == null) && - pool != null) - pool.signalWork(); + if ((a == null || a[m & (s - 1)] == null) && pool != null) + pool.signalWork(a, p); } /** @@ -1439,7 +1435,7 @@ public class ForkJoinPool extends AbstractExecutorService { a, slotOffset(k), t, null))) { updateBase(nb); if (a[nk] != null && pool != null) - pool.signalWork(); // propagate + pool.signalWork(a, nk); // propagate return t; } if (o == null && a[nk] == null && array == a && @@ -1456,14 +1452,23 @@ public class ForkJoinPool extends AbstractExecutorService { * to repoll from the queue obtained from pool.scan. */ private ForkJoinTask tryPoll() { - ForkJoinTask t; ForkJoinTask[] a; int b, cap, k; - if ((a = array) != null && (cap = a.length) > 0 && - (t = a[k = (b = base) & (cap - 1)]) != null) { - U.loadFence(); - if (base == b && - U.compareAndSetReference(a, slotOffset(k), t, null)) { - updateBase(b + 1); - return t; + ForkJoinTask[] a; int cap; + if ((a = array) != null && (cap = a.length) > 0) { + for (int b = base, k;;) { // loop only if inconsistent + ForkJoinTask t = a[k = b & (cap - 1)]; + U.loadFence(); + if (b == (b = base)) { + Object o; + if (t == null) + o = a[k]; + else if (t == (o = U.compareAndExchangeReference( + a, slotOffset(k), t, null))) { + updateBase(b + 1); + return t; + } + if (o == null) + break; + } } } return null; @@ -1676,9 +1681,11 @@ public class ForkJoinPool extends AbstractExecutorService { final long config; // static configuration bits volatile long stealCount; // collects worker nsteals volatile long threadIds; // for worker thread names - volatile long ctl; // main pool control - int parallelism; // target number of workers volatile int runState; // versioned, lockable + @jdk.internal.vm.annotation.Contended("fjpctl") // segregate + volatile long ctl; // main pool control + @jdk.internal.vm.annotation.Contended("fjpctl") // colocate + int parallelism; // target number of workers // Support for atomic operations private static final Unsafe U; @@ -1876,8 +1883,8 @@ public class ForkJoinPool extends AbstractExecutorService { c, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (LMASK & c))))); - else if ((int)c == 0) // was dropped on timeout - replaceable = false; + else if ((int)c != 0) + replaceable = true; // signal below to cascade timeouts if (w != null) { // cancel remaining tasks for (ForkJoinTask t; (t = w.nextLocalTask()) != null; ) { try { @@ -1898,15 +1905,19 @@ public class ForkJoinPool extends AbstractExecutorService { unlockRunState(); } if ((runState & STOP) == 0 && replaceable) - signalWork(); // may replace unless trimmed or uninitialized + signalWork(null, 0); // may replace unless trimmed or uninitialized if (ex != null) ForkJoinTask.rethrow(ex); } /** - * Releases an idle worker, or creates one if not enough exist. + * Releases an idle worker, or creates one if not enough exist, + * returning on contention if a signal task is already taken. + * + * @param a if nonnull, a task array holding task signalled + * @param k index of task in array */ - final void signalWork() { + final void signalWork(ForkJoinTask[] a, int k) { int pc = parallelism; for (long c = ctl;;) { WorkQueue[] qs = queues; @@ -1928,20 +1939,21 @@ public class ForkJoinPool extends AbstractExecutorService { if (v == null) createWorker(); else { - Thread t; v.phase = sp; - if ((t = v.parker) != null) - U.unpark(t); + if (v.parking != 0) + U.unpark(v.owner); } break; } + if (a != null && k >= 0 && k < a.length && a[k] == null) + break; } } /** - * Reactivates the given worker, and possibly interrupts others if - * not top of ctl stack. Called only during shutdown to ensure release - * on termination. + * Reactivates the given worker, and possibly others if not top of + * ctl stack. Called only during shutdown to ensure release on + * termination. */ private void reactivate(WorkQueue w) { for (long c = ctl;;) { @@ -1953,16 +1965,11 @@ public class ForkJoinPool extends AbstractExecutorService { if (c == (c = compareAndExchangeCtl( c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | (v.stackPred & LMASK))))) { - Thread t; v.phase = sp; - if ((t = v.parker) != null) { - try { - t.interrupt(); - } catch (Throwable ignore) { - } - } if (v == w) break; + if (v.parking != 0) + U.unpark(v.owner); } } } @@ -1993,18 +2000,18 @@ public class ForkJoinPool extends AbstractExecutorService { sum += (p & 0xffffffffL) | ((long)b << 32); if ((p & IDLE) == 0 || s - b > 0) { if ((i & 1) == 0 && compareAndSetCtl(c, c)) - signalWork(); // ensure live + signalWork(null, 0); // ensure live return false; } } } swept = (phaseSum == (phaseSum = sum)); } - else if (compareAndSetCtl(c, c) && // confirm - casRunState(e, (e & SHUTDOWN) != 0 ? e | STOP : e)) { - if ((e & SHUTDOWN) != 0) // enable termination - interruptAll(); + else if ((e & SHUTDOWN) == 0) return true; + else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) { + interruptAll(); // confirmed + return true; // enable termination } else break; // restart @@ -2021,17 +2028,12 @@ public class ForkJoinPool extends AbstractExecutorService { final void runWorker(WorkQueue w) { if (w != null) { int phase = w.phase, r = w.stackPred; // seed from registerWorker - for (long window = NO_HISTORY | (r >>> 16);;) { - r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift - if ((runState & STOP) != 0) // terminating - break; - if (window == (window = scan(w, window & WMASK, r)) && - window >= 0L && phase != (phase = awaitWork(w, phase))) { - if ((phase & IDLE) != 0) - break; // worker exit - window = NO_HISTORY | (window & SMASK); // clear history - } - } + long window = (long)((r >>> 16) & SMASK) | NO_HISTORY; + do { + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + } while ((runState & STOP) == 0 && + (((window = scan(w, window, r)) < 0L || + ((phase = awaitWork(w, phase)) & IDLE) == 0))); } } @@ -2048,105 +2050,116 @@ public class ForkJoinPool extends AbstractExecutorService { private long scan(WorkQueue w, long window, int r) { WorkQueue[] qs = queues; int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1; + long next = window & ~RESCAN; outer: for (int i = (short)window, l = n; l > 0; --l, i += step) { int j, cap; WorkQueue q; ForkJoinTask[] a; if ((q = qs[j = i & SMASK & (n - 1)]) != null && (a = q.array) != null && (cap = a.length) > 0) { - for (;;) { - int b, k; Object o; - ForkJoinTask t = a[k = (b = q.base) & (cap - 1)]; + for (int b = q.base;;) { + int nb = b + 1, nk = nb & (cap - 1), k; + ForkJoinTask t = a[k = b & (cap - 1)]; U.loadFence(); // re-read b and t - if (q.base == b) { // else inconsistent; retry - int nb = b + 1, nk = nb & (cap - 1); - if (t == null) { - if (a[k] == null) { // revisit if another task - if (window >= 0L && a[nk] != null) - window |= RESCAN; - break; - } - } + if (b == (b = q.base)) { // else inconsistent; retry + Object o; + if (t == null) + o = a[k]; else if (t == (o = U.compareAndExchangeReference( - a, slotOffset(k), t, null))) { + a, slotOffset(k), t, null))) { q.updateBase(nb); - long pw = window, nw = ((pw << 16) | j) & WMASK; - window = nw | RESCAN; - if ((nw != pw || (short)(nw >>> 32) != j) && - a[nk] != null) - signalWork(); // limit propagation + next = RESCAN | ((window << 16) & HMASK) | j; + if (window != next && a[nk] != null) + signalWork(a, nk); // limit propagation if (w != null) // always true w.topLevelExec(t, q, j); break outer; } - else if (o == null) // contended - break; // retried unless newly active + if (o == null) { + if (next >= 0L && a[nk] != null) + next |= RESCAN; + break; + } } } } } - return window; + return next; } /** * Tries to inactivate, and if successful, awaits signal or termination. * * @param w the worker (may be null if already terminated) - * @param p current phase + * @param phase current phase * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w, int p) { + private int awaitWork(WorkQueue w, int phase) { + boolean quiet; // true if possibly quiescent + int active = phase + (IDLE << 1), p = phase | IDLE, e; if (w != null) { - int idlePhase = p + IDLE, nextPhase = p + (IDLE << 1); - long pc = ctl, qc = (nextPhase & LMASK) | ((pc - RC_UNIT) & UMASK); - w.stackPred = (int)pc; // set ctl stack link - w.phase = idlePhase; // try to inactivate - if (!compareAndSetCtl(pc, qc)) // contended enque - return w.phase = p; // back out - int ac = (short)(qc >>> RC_SHIFT); - boolean quiescent = (ac <= 0 && quiescent()); - if ((runState & STOP) != 0) - return idlePhase; - int spins = ac + ((((int)(qc >>> TC_SHIFT)) & SMASK) << 1); - while ((p = w.phase) == idlePhase && --spins > 0) - Thread.onSpinWait(); // spin for approx #accesses to signal - if (p == idlePhase) { - long deadline = (!quiescent ? 0L : // timeout for trim - System.currentTimeMillis() + keepAlive); - WorkQueue[] qs = queues; - int n = (qs == null) ? 0 : qs.length; - for (int i = 0; i < n; ++i) { // recheck queues - WorkQueue q; ForkJoinTask[] a; int cap; - if ((q = qs[i]) != null && - (a = q.array) != null && (cap = a.length) > 0 && - a[q.base & (cap - 1)] != null && - ctl == qc && compareAndSetCtl(qc, pc)) { - w.phase = (int)qc; // release - break; - } + w.phase = p; // deactivate + long np = active & LMASK, pc = ctl; // try to enqueue + long qc = np | ((pc - RC_UNIT) & UMASK); + w.stackPred = (int)pc; // set ctl stack link + if (pc != (pc = compareAndExchangeCtl(pc, qc))) { + qc = np | ((pc - RC_UNIT) & UMASK); + w.stackPred = (int)pc; // retry once + if (pc != (pc = compareAndExchangeCtl(pc, qc))) + p = w.phase = phase; // back out + } + if (p != phase && ((e = runState) & STOP) == 0 && + (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 || + !(quiet = quiescent()) || (runState & STOP) == 0)) { + long deadline = 0L; // not terminating + if (quiet) { // use timeout if trimmable + int nt = (short)(qc >>> TC_SHIFT); + long delay = keepAlive; // scale if not at target + if (nt != (nt = Math.max(nt, parallelism)) && nt > 0) + delay = Math.max(TIMEOUT_SLOP, delay / nt); + if ((deadline = delay + System.currentTimeMillis()) == 0L) + deadline = 1L; // avoid zero } - if ((p = w.phase) == idlePhase) { // emulate LockSupport.park + boolean release = quiet; + WorkQueue[] qs = queues; // recheck queues + int n = (qs == null) ? 0 : qs.length; + for (int l = -n, j = active; l < n; ++l, ++j) { + WorkQueue q; ForkJoinTask[] a; int cap; + if ((p = w.phase) == active) // interleave signal checks + break; + if ((q = qs[j & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0 && + a[q.base & (cap - 1)] != null) { + if (release && qc == ctl && compareAndSetCtl(qc, pc)) { + p = w.phase = active; + break; // possible missed signal + } + release = true; // track multiple or reencounter + } + Thread.onSpinWait(); // reduce memory traffic + } + if (p != active) { // emulate LockSupport.park LockSupport.setCurrentBlocker(this); - w.parker = Thread.currentThread(); + w.parking = 1; for (;;) { - if ((runState & STOP) != 0 || (p = w.phase) != idlePhase) + if ((runState & STOP) != 0 || (p = w.phase) == active) break; - U.park(quiescent, deadline); - if ((p = w.phase) != idlePhase || (runState & STOP) != 0) + U.park(deadline != 0L, deadline); + if ((p = w.phase) == active || (runState & STOP) != 0) break; - Thread.interrupted(); // clear for next park - if (quiescent && TIMEOUT_SLOP > + Thread.interrupted(); // clear for next park + if (deadline != 0L && TIMEOUT_SLOP > deadline - System.currentTimeMillis()) { - long sp = w.stackPred & LMASK; - long c = ctl, nc = sp | (UMASK & (c - TC_UNIT)); - if (((int)c & SMASK) == (idlePhase & SMASK) && + long sp = w.stackPred & LMASK, c = ctl; + long nc = sp | (UMASK & (c - TC_UNIT)); + if (((int)c & SMASK) == (active & SMASK) && compareAndSetCtl(c, nc)) { w.source = DEREGISTERED; - w.phase = (int)c; - break; + w.phase = active; + break; // trimmed on timeout } - deadline += keepAlive; // not head; reset timer + deadline = 0L; // no longer trimmable } } - w.parker = null; + w.parking = 0; LockSupport.setCurrentBlocker(null); } } @@ -2201,13 +2214,13 @@ public class ForkJoinPool extends AbstractExecutorService { sp = (int)c, stat = -1; // default retry return if (sp != 0 && active <= pc) { // activate idle worker - WorkQueue[] qs; WorkQueue v; int i; Thread t; + WorkQueue[] qs; WorkQueue v; int i; if ((qs = queues) != null && qs.length > (i = sp & SMASK) && (v = qs[i]) != null && compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { v.phase = sp; - if ((t = v.parker) != null) - U.unpark(t); + if (v.parking != 0) + U.unpark(v.owner); stat = UNCOMPENSATE; } } @@ -2807,7 +2820,6 @@ public class ForkJoinPool extends AbstractExecutorService { } } - /** * Returns termination signal, constructing if necessary */ @@ -3099,7 +3111,13 @@ public class ForkJoinPool extends AbstractExecutorService { public T invoke(ForkJoinTask task) { Objects.requireNonNull(task); poolSubmit(true, task); - return task.join(); + try { + return task.join(); + } catch (RuntimeException | Error unchecked) { + throw unchecked; + } catch (Exception checked) { + throw new RuntimeException(checked); + } } /** diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java index 781b7bffe6d..7db3e20a2fb 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java @@ -626,6 +626,22 @@ public class ForkJoinPoolTest extends JSR166TestCase { } } + /** + * invoke throws a RuntimeException if task throws unchecked exception + */ + public void testInvokeUncheckedException() throws Throwable { + ForkJoinPool p = new ForkJoinPool(1); + try (PoolCleaner cleaner = cleaner(p)) { + try { + p.invoke(ForkJoinTask.adapt(new Callable() { + public Object call() { throw new ArithmeticException(); }})); + shouldThrow(); + } catch (RuntimeException success) { + assertTrue(success.getCause() instanceof ArithmeticException); + } + } + } + /** * invokeAny(null) throws NullPointerException */