6978087: jsr166y Updates

Simplify the ForkJoinPool API, reworking some of the internals

Reviewed-by: martin, dholmes, chegar
This commit is contained in:
Doug Lea 2010-09-13 09:55:03 +01:00
parent 4c7ea63262
commit a4641686a0
6 changed files with 2482 additions and 2314 deletions

File diff suppressed because it is too large Load Diff

View File

@ -91,10 +91,7 @@ import java.util.WeakHashMap;
* results of a task is {@link #join}, but there are several variants: * results of a task is {@link #join}, but there are several variants:
* The {@link Future#get} methods support interruptible and/or timed * The {@link Future#get} methods support interruptible and/or timed
* waits for completion and report results using {@code Future} * waits for completion and report results using {@code Future}
* conventions. Method {@link #helpJoin} enables callers to actively * conventions. Method {@link #invoke} is semantically
* execute other tasks while awaiting joins, which is sometimes more
* efficient but only applies when all subtasks are known to be
* strictly tree-structured. Method {@link #invoke} is semantically
* equivalent to {@code fork(); join()} but always attempts to begin * equivalent to {@code fork(); join()} but always attempts to begin
* execution in the current thread. The "<em>quiet</em>" forms of * execution in the current thread. The "<em>quiet</em>" forms of
* these methods do not extract results or report exceptions. These * these methods do not extract results or report exceptions. These
@ -130,7 +127,7 @@ import java.util.WeakHashMap;
* ForkJoinTasks (as may be determined using method {@link * ForkJoinTasks (as may be determined using method {@link
* #inForkJoinPool}). Attempts to invoke them in other contexts * #inForkJoinPool}). Attempts to invoke them in other contexts
* result in exceptions or errors, possibly including * result in exceptions or errors, possibly including
* ClassCastException. * {@code ClassCastException}.
* *
* <p>Most base support methods are {@code final}, to prevent * <p>Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the * overriding of implementations that are intrinsically tied to the
@ -152,9 +149,8 @@ import java.util.WeakHashMap;
* *
* <p>This class provides {@code adapt} methods for {@link Runnable} * <p>This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of * and {@link Callable}, that may be of use when mixing execution of
* {@code ForkJoinTasks} with other kinds of tasks. When all tasks * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
* are of this form, consider using a pool in * of this form, consider using a pool constructed in <em>asyncMode</em>.
* {@linkplain ForkJoinPool#setAsyncMode async mode}.
* *
* <p>ForkJoinTasks are {@code Serializable}, which enables them to be * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
* used in extensions such as remote execution frameworks. It is * used in extensions such as remote execution frameworks. It is
@ -166,33 +162,43 @@ import java.util.WeakHashMap;
*/ */
public abstract class ForkJoinTask<V> implements Future<V>, Serializable { public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** /*
* Run control status bits packed into a single int to minimize * See the internal documentation of class ForkJoinPool for a
* footprint and to ensure atomicity (via CAS). Status is * general implementation overview. ForkJoinTasks are mainly
* initially zero, and takes on nonnegative values until * responsible for maintaining their "status" field amidst relays
* completed, upon which status holds COMPLETED. CANCELLED, or * to methods in ForkJoinWorkerThread and ForkJoinPool. The
* EXCEPTIONAL, which use the top 3 bits. Tasks undergoing * methods of this class are more-or-less layered into (1) basic
* blocking waits by other threads have SIGNAL_MASK bits set -- * status maintenance (2) execution and awaiting completion (3)
* bit 15 for external (nonFJ) waits, and the rest a count of * user-level methods that additionally report results. This is
* waiting FJ threads. (This representation relies on * sometimes hard to see because this file orders exported methods
* ForkJoinPool max thread limits). Completion of a stolen task * in a way that flows well in javadocs. In particular, most
* with SIGNAL_MASK bits set awakens waiter via notifyAll. Even * join mechanics are in method quietlyJoin, below.
* though suboptimal for some purposes, we use basic builtin
* wait/notify to take advantage of "monitor inflation" in JVMs
* that we would otherwise need to emulate to avoid adding further
* per-task bookkeeping overhead. Note that bits 16-28 are
* currently unused. Also value 0x80000000 is available as spare
* completion value.
*/ */
/*
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status holds value
* NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
* waits by other threads have the SIGNAL bit set. Completion of
* a stolen task with SIGNAL set awakens any waiters via
* notifyAll. Even though suboptimal for some purposes, we use
* basic builtin wait/notify to take advantage of "monitor
* inflation" in JVMs that we would otherwise need to emulate to
* avoid adding further per-task bookkeeping overhead. We want
* these monitors to be "fat", i.e., not use biasing or thin-lock
* techniques, so use some odd coding idioms that tend to avoid
* them.
*/
/** The run status of this task */
volatile int status; // accessed directly by pool and workers volatile int status; // accessed directly by pool and workers
static final int COMPLETION_MASK = 0xe0000000; private static final int NORMAL = -1;
static final int NORMAL = 0xe0000000; // == mask private static final int CANCELLED = -2;
static final int CANCELLED = 0xc0000000; private static final int EXCEPTIONAL = -3;
static final int EXCEPTIONAL = 0xa0000000; private static final int SIGNAL = 1;
static final int SIGNAL_MASK = 0x0000ffff;
static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
/** /**
* Table of exceptions thrown by tasks, to enable reporting by * Table of exceptions thrown by tasks, to enable reporting by
@ -206,330 +212,114 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Collections.synchronizedMap Collections.synchronizedMap
(new WeakHashMap<ForkJoinTask<?>, Throwable>()); (new WeakHashMap<ForkJoinTask<?>, Throwable>());
// within-package utilities // Maintaining completion status
/** /**
* Gets current worker thread, or null if not a worker thread. * Marks completion and wakes up threads waiting to join this task,
*/ * also clearing signal request bits.
static ForkJoinWorkerThread getWorker() {
Thread t = Thread.currentThread();
return ((t instanceof ForkJoinWorkerThread) ?
(ForkJoinWorkerThread) t : null);
}
final boolean casStatus(int cmp, int val) {
return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
}
/**
* Workaround for not being able to rethrow unchecked exceptions.
*/
static void rethrowException(Throwable ex) {
if (ex != null)
UNSAFE.throwException(ex);
}
// Setting completion status
/**
* Marks completion and wakes up threads waiting to join this task.
* *
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
*/ */
final void setCompletion(int completion) { private void setCompletion(int completion) {
ForkJoinPool pool = getPool();
if (pool != null) {
int s; // Clear signal bits while setting completion status
do {} while ((s = status) >= 0 && !casStatus(s, completion));
if ((s & SIGNAL_MASK) != 0) {
if ((s &= INTERNAL_SIGNAL_MASK) != 0)
pool.updateRunningCount(s);
synchronized (this) { notifyAll(); }
}
}
else
externallySetCompletion(completion);
}
/**
* Version of setCompletion for non-FJ threads. Leaves signal
* bits for unblocked threads to adjust, and always notifies.
*/
private void externallySetCompletion(int completion) {
int s;
do {} while ((s = status) >= 0 &&
!casStatus(s, (s & SIGNAL_MASK) | completion));
synchronized (this) { notifyAll(); }
}
/**
* Sets status to indicate normal completion.
*/
final void setNormalCompletion() {
// Try typical fast case -- single CAS, no signal, not already done.
// Manually expand casStatus to improve chances of inlining it
if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
setCompletion(NORMAL);
}
// internal waiting and notification
/**
* Performs the actual monitor wait for awaitDone.
*/
private void doAwaitDone() {
// Minimize lock bias and in/de-flation effects by maximizing
// chances of waiting inside sync
try {
while (status >= 0)
synchronized (this) { if (status >= 0) wait(); }
} catch (InterruptedException ie) {
onInterruptedWait();
}
}
/**
* Performs the actual timed monitor wait for awaitDone.
*/
private void doAwaitDone(long startTime, long nanos) {
synchronized (this) {
try {
while (status >= 0) {
long nt = nanos - (System.nanoTime() - startTime);
if (nt <= 0)
break;
wait(nt / 1000000, (int) (nt % 1000000));
}
} catch (InterruptedException ie) {
onInterruptedWait();
}
}
}
// Awaiting completion
/**
* Sets status to indicate there is joiner, then waits for join,
* surrounded with pool notifications.
*
* @return status upon exit
*/
private int awaitDone(ForkJoinWorkerThread w,
boolean maintainParallelism) {
ForkJoinPool pool = (w == null) ? null : w.pool;
int s; int s;
while ((s = status) >= 0) { while ((s = status) >= 0) {
if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) { if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (pool == null || !pool.preJoin(this, maintainParallelism)) if (s != 0)
doAwaitDone(); synchronized (this) { notifyAll(); }
if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
adjustPoolCountsOnUnblock(pool);
break; break;
} }
} }
return s;
} }
/** /**
* Timed version of awaitDone * Records exception and sets exceptional completion.
* *
* @return status upon exit * @return status on exit
*/ */
private int awaitDone(ForkJoinWorkerThread w, long nanos) { private void setExceptionalCompletion(Throwable rex) {
ForkJoinPool pool = (w == null) ? null : w.pool;
int s;
while ((s = status) >= 0) {
if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
long startTime = System.nanoTime();
if (pool == null || !pool.preJoin(this, false))
doAwaitDone(startTime, nanos);
if ((s = status) >= 0) {
adjustPoolCountsOnCancelledWait(pool);
s = status;
}
if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
adjustPoolCountsOnUnblock(pool);
break;
}
}
return s;
}
/**
* Notifies pool that thread is unblocked. Called by signalled
* threads when woken by non-FJ threads (which is atypical).
*/
private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
int s;
do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
pool.updateRunningCount(s);
}
/**
* Notifies pool to adjust counts on cancelled or timed out wait.
*/
private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
if (pool != null) {
int s;
while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
if (casStatus(s, s - 1)) {
pool.updateRunningCount(1);
break;
}
}
}
}
/**
* Handles interruptions during waits.
*/
private void onInterruptedWait() {
ForkJoinWorkerThread w = getWorker();
if (w == null)
Thread.currentThread().interrupt(); // re-interrupt
else if (w.isTerminating())
cancelIgnoringExceptions();
// else if FJworker, ignore interrupt
}
// Recording and reporting exceptions
private void setDoneExceptionally(Throwable rex) {
exceptionMap.put(this, rex); exceptionMap.put(this, rex);
setCompletion(EXCEPTIONAL); setCompletion(EXCEPTIONAL);
} }
/** /**
* Throws the exception associated with status s. * Blocks a worker thread until completion. Called only by
* pool. Currently unused -- pool-based waits use timeout
* version below.
*/
final void internalAwaitDone() {
int s; // the odd construction reduces lock bias effects
while ((s = status) >= 0) {
try {
synchronized(this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait();
}
} catch (InterruptedException ie) {
cancelIfTerminating();
}
}
}
/**
* Blocks a worker thread until completed or timed out. Called
* only by pool.
* *
* @throws the exception * @return status on exit
*/ */
private void reportException(int s) { final int internalAwaitDone(long millis) {
if ((s &= COMPLETION_MASK) < NORMAL) { int s;
if (s == CANCELLED) if ((s = status) >= 0) {
throw new CancellationException(); try {
else synchronized(this) {
rethrowException(exceptionMap.get(this)); if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait(millis, 0);
}
} catch (InterruptedException ie) {
cancelIfTerminating();
}
s = status;
}
return s;
}
/**
* Blocks a non-worker-thread until completion.
*/
private void externalAwaitDone() {
int s;
while ((s = status) >= 0) {
synchronized(this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
boolean interrupted = false;
while (status >= 0) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
if (interrupted)
Thread.currentThread().interrupt();
break;
}
}
} }
} }
/** /**
* Returns result or throws exception using j.u.c.Future conventions. * Unless done, calls exec and records status if completed, but
* Only call when {@code isDone} known to be true or thread known * doesn't wait for completion otherwise. Primary execution method
* to be interrupted. * for ForkJoinWorkerThread.
*/
private V reportFutureResult()
throws InterruptedException, ExecutionException {
if (Thread.interrupted())
throw new InterruptedException();
int s = status & COMPLETION_MASK;
if (s < NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
}
return getRawResult();
}
/**
* Returns result or throws exception using j.u.c.Future conventions
* with timeouts.
*/
private V reportTimedFutureResult()
throws InterruptedException, ExecutionException, TimeoutException {
if (Thread.interrupted())
throw new InterruptedException();
Throwable ex;
int s = status & COMPLETION_MASK;
if (s == NORMAL)
return getRawResult();
else if (s == CANCELLED)
throw new CancellationException();
else if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
else
throw new TimeoutException();
}
// internal execution methods
/**
* Calls exec, recording completion, and rethrowing exception if
* encountered. Caller should normally check status before calling.
*
* @return true if completed normally
*/
private boolean tryExec() {
try { // try block must contain only call to exec
if (!exec())
return false;
} catch (Throwable rex) {
setDoneExceptionally(rex);
rethrowException(rex);
return false; // not reached
}
setNormalCompletion();
return true;
}
/**
* Main execution method used by worker threads. Invokes
* base computation unless already complete.
*/ */
final void quietlyExec() { final void quietlyExec() {
if (status >= 0) {
try { try {
if (!exec()) if (status < 0 || !exec())
return; return;
} catch (Throwable rex) { } catch (Throwable rex) {
setDoneExceptionally(rex); setExceptionalCompletion(rex);
return; return;
} }
setNormalCompletion(); setCompletion(NORMAL); // must be outside try block
}
}
/**
* Calls exec(), recording but not rethrowing exception.
* Caller should normally check status before calling.
*
* @return true if completed normally
*/
private boolean tryQuietlyInvoke() {
try {
if (!exec())
return false;
} catch (Throwable rex) {
setDoneExceptionally(rex);
return false;
}
setNormalCompletion();
return true;
}
/**
* Cancels, ignoring any exceptions it throws.
*/
final void cancelIgnoringExceptions() {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
/**
* Main implementation of helpJoin
*/
private int busyJoin(ForkJoinWorkerThread w) {
int s;
ForkJoinTask<?> t;
while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
t.quietlyExec();
return (s >= 0) ? awaitDone(w, false) : s; // block if no work
} }
// public methods // public methods
@ -567,34 +357,41 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the computed result * @return the computed result
*/ */
public final V join() { public final V join() {
ForkJoinWorkerThread w = getWorker(); quietlyJoin();
if (w == null || status < 0 || !w.unpushTask(this) || !tryExec()) Throwable ex;
reportException(awaitDone(w, true)); if (status < NORMAL && (ex = getException()) != null)
UNSAFE.throwException(ex);
return getRawResult(); return getRawResult();
} }
/** /**
* Commences performing this task, awaits its completion if * Commences performing this task, awaits its completion if
* necessary, and return its result, or throws an (unchecked) * necessary, and returns its result, or throws an (unchecked)
* exception if the underlying computation did so. * {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
* *
* @return the computed result * @return the computed result
*/ */
public final V invoke() { public final V invoke() {
if (status >= 0 && tryExec()) quietlyInvoke();
Throwable ex;
if (status < NORMAL && (ex = getException()) != null)
UNSAFE.throwException(ex);
return getRawResult(); return getRawResult();
else
return join();
} }
/** /**
* Forks the given tasks, returning when {@code isDone} holds for * Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which * each task or an (unchecked) exception is encountered, in which
* case the exception is rethrown. If either task encounters an * case the exception is rethrown. If more than one task
* exception, the other one may be, but is not guaranteed to be, * encounters an exception, then this method throws any one of
* cancelled. If both tasks throw an exception, then this method * these exceptions. If any task encounters an exception, the
* throws one of them. The individual status of each task may be * other may be cancelled. However, the execution status of
* checked using {@link #getException()} and related methods. * individual tasks is not guaranteed upon exceptional return. The
* status of each task may be obtained using {@link
* #getException()} and related methods to check if they have been
* cancelled, completed normally or exceptionally, or left
* unprocessed.
* *
* <p>This method may be invoked only from within {@code * <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method * ForkJoinTask} computations (as may be determined using method
@ -615,12 +412,14 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** /**
* Forks the given tasks, returning when {@code isDone} holds for * Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which * each task or an (unchecked) exception is encountered, in which
* case the exception is rethrown. If any task encounters an * case the exception is rethrown. If more than one task
* exception, others may be, but are not guaranteed to be, * encounters an exception, then this method throws any one of
* cancelled. If more than one task encounters an exception, then * these exceptions. If any task encounters an exception, others
* this method throws any one of these exceptions. The individual * may be cancelled. However, the execution status of individual
* status of each task may be checked using {@link #getException()} * tasks is not guaranteed upon exceptional return. The status of
* and related methods. * each task may be obtained using {@link #getException()} and
* related methods to check if they have been cancelled, completed
* normally or exceptionally, or left unprocessed.
* *
* <p>This method may be invoked only from within {@code * <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method * ForkJoinTask} computations (as may be determined using method
@ -644,7 +443,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
t.fork(); t.fork();
else { else {
t.quietlyInvoke(); t.quietlyInvoke();
if (ex == null) if (ex == null && t.status < NORMAL)
ex = t.getException(); ex = t.getException();
} }
} }
@ -655,26 +454,27 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
t.cancel(false); t.cancel(false);
else { else {
t.quietlyJoin(); t.quietlyJoin();
if (ex == null) if (ex == null && t.status < NORMAL)
ex = t.getException(); ex = t.getException();
} }
} }
} }
if (ex != null) if (ex != null)
rethrowException(ex); UNSAFE.throwException(ex);
} }
/** /**
* Forks all tasks in the specified collection, returning when * Forks all tasks in the specified collection, returning when
* {@code isDone} holds for each task or an (unchecked) exception * {@code isDone} holds for each task or an (unchecked) exception
* is encountered. If any task encounters an exception, others * is encountered, in which case the exception is rethrown. If
* may be, but are not guaranteed to be, cancelled. If more than * more than one task encounters an exception, then this method
* one task encounters an exception, then this method throws any * throws any one of these exceptions. If any task encounters an
* one of these exceptions. The individual status of each task * exception, others may be cancelled. However, the execution
* may be checked using {@link #getException()} and related * status of individual tasks is not guaranteed upon exceptional
* methods. The behavior of this operation is undefined if the * return. The status of each task may be obtained using {@link
* specified collection is modified while the operation is in * #getException()} and related methods to check if they have been
* progress. * cancelled, completed normally or exceptionally, or left
* unprocessed.
* *
* <p>This method may be invoked only from within {@code * <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method * ForkJoinTask} computations (as may be determined using method
@ -706,7 +506,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
t.fork(); t.fork();
else { else {
t.quietlyInvoke(); t.quietlyInvoke();
if (ex == null) if (ex == null && t.status < NORMAL)
ex = t.getException(); ex = t.getException();
} }
} }
@ -717,13 +517,13 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
t.cancel(false); t.cancel(false);
else { else {
t.quietlyJoin(); t.quietlyJoin();
if (ex == null) if (ex == null && t.status < NORMAL)
ex = t.getException(); ex = t.getException();
} }
} }
} }
if (ex != null) if (ex != null)
rethrowException(ex); UNSAFE.throwException(ex);
return tasks; return tasks;
} }
@ -753,7 +553,35 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
setCompletion(CANCELLED); setCompletion(CANCELLED);
return (status & COMPLETION_MASK) == CANCELLED; return status == CANCELLED;
}
/**
* Cancels, ignoring any exceptions thrown by cancel. Used during
* worker and pool shutdown. Cancel is spec'ed not to throw any
* exceptions, but if it does anyway, we have no recourse during
* shutdown, so guard against this case.
*/
final void cancelIgnoringExceptions() {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
/**
* Cancels if current thread is a terminating worker thread,
* ignoring any exceptions thrown by cancel.
*/
final void cancelIfTerminating() {
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread) t).isTerminating()) {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
} }
public final boolean isDone() { public final boolean isDone() {
@ -761,7 +589,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} }
public final boolean isCancelled() { public final boolean isCancelled() {
return (status & COMPLETION_MASK) == CANCELLED; return status == CANCELLED;
} }
/** /**
@ -770,7 +598,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if this task threw an exception or was cancelled * @return {@code true} if this task threw an exception or was cancelled
*/ */
public final boolean isCompletedAbnormally() { public final boolean isCompletedAbnormally() {
return (status & COMPLETION_MASK) < NORMAL; return status < NORMAL;
} }
/** /**
@ -781,7 +609,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* exception and was not cancelled * exception and was not cancelled
*/ */
public final boolean isCompletedNormally() { public final boolean isCompletedNormally() {
return (status & COMPLETION_MASK) == NORMAL; return status == NORMAL;
} }
/** /**
@ -792,7 +620,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the exception, or {@code null} if none * @return the exception, or {@code null} if none
*/ */
public final Throwable getException() { public final Throwable getException() {
int s = status & COMPLETION_MASK; int s = status;
return ((s >= NORMAL) ? null : return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() : (s == CANCELLED) ? new CancellationException() :
exceptionMap.get(this)); exceptionMap.get(this));
@ -813,20 +641,21 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* thrown will be a {@code RuntimeException} with cause {@code ex}. * thrown will be a {@code RuntimeException} with cause {@code ex}.
*/ */
public void completeExceptionally(Throwable ex) { public void completeExceptionally(Throwable ex) {
setDoneExceptionally((ex instanceof RuntimeException) || setExceptionalCompletion((ex instanceof RuntimeException) ||
(ex instanceof Error) ? ex : (ex instanceof Error) ? ex :
new RuntimeException(ex)); new RuntimeException(ex));
} }
/** /**
* Completes this task, and if not already aborted or cancelled, * Completes this task, and if not already aborted or cancelled,
* returning a {@code null} result upon {@code join} and related * returning the given value as the result of subsequent
* operations. This method may be used to provide results for * invocations of {@code join} and related operations. This method
* asynchronous tasks, or to provide alternative handling for * may be used to provide results for asynchronous tasks, or to
* tasks that would not otherwise complete normally. Its use in * provide alternative handling for tasks that would not otherwise
* other situations is discouraged. This method is * complete normally. Its use in other situations is
* overridable, but overridden versions must invoke {@code super} * discouraged. This method is overridable, but overridden
* implementation to maintain guarantees. * versions must invoke {@code super} implementation to maintain
* guarantees.
* *
* @param value the result value for this task * @param value the result value for this task
*/ */
@ -834,98 +663,152 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
try { try {
setRawResult(value); setRawResult(value);
} catch (Throwable rex) { } catch (Throwable rex) {
setDoneExceptionally(rex); setExceptionalCompletion(rex);
return; return;
} }
setNormalCompletion(); setCompletion(NORMAL);
} }
public final V get() throws InterruptedException, ExecutionException { public final V get() throws InterruptedException, ExecutionException {
ForkJoinWorkerThread w = getWorker(); quietlyJoin();
if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) if (Thread.interrupted())
awaitDone(w, true); throw new InterruptedException();
return reportFutureResult(); int s = status;
if (s < NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
}
return getRawResult();
} }
public final V get(long timeout, TimeUnit unit) public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException { throws InterruptedException, ExecutionException, TimeoutException {
long nanos = unit.toNanos(timeout); Thread t = Thread.currentThread();
ForkJoinWorkerThread w = getWorker(); ForkJoinPool pool;
if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) if (t instanceof ForkJoinWorkerThread) {
awaitDone(w, nanos); ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
return reportTimedFutureResult(); if (status >= 0 && w.unpushTask(this))
quietlyExec();
pool = w.pool;
} }
else
/** pool = null;
* Possibly executes other tasks until this task {@link #isDone is /*
* done}, then returns the result of the computation. This method * Timed wait loop intermixes cases for FJ (pool != null) and
* may be more efficient than {@code join}, but is only applicable * non FJ threads. For FJ, decrement pool count but don't try
* when there are no potential dependencies between continuation * for replacement; increment count on completion. For non-FJ,
* of the current task and that of any other task that might be * deal with interrupts. This is messy, but a little less so
* executed while helping. (This usually holds for pure * than is splitting the FJ and nonFJ cases.
* divide-and-conquer tasks).
*
* <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return the computed result
*/ */
public final V helpJoin() { boolean interrupted = false;
ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread(); boolean dec = false; // true if pool count decremented
if (status < 0 || !w.unpushTask(this) || !tryExec()) long nanos = unit.toNanos(timeout);
reportException(busyJoin(w)); for (;;) {
if (pool == null && Thread.interrupted()) {
interrupted = true;
break;
}
int s = status;
if (s < 0)
break;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
long startTime = System.nanoTime();
long nt; // wait time
while (status >= 0 &&
(nt = nanos - (System.nanoTime() - startTime)) > 0) {
if (pool != null && !dec)
dec = pool.tryDecrementRunningCount();
else {
long ms = nt / 1000000;
int ns = (int) (nt % 1000000);
try {
synchronized(this) {
if (status >= 0)
wait(ms, ns);
}
} catch (InterruptedException ie) {
if (pool != null)
cancelIfTerminating();
else {
interrupted = true;
break;
}
}
}
}
break;
}
}
if (pool != null && dec)
pool.incrementRunningCount();
if (interrupted)
throw new InterruptedException();
int es = status;
if (es != NORMAL) {
Throwable ex;
if (es == CANCELLED)
throw new CancellationException();
if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
throw new TimeoutException();
}
return getRawResult(); return getRawResult();
} }
/** /**
* Possibly executes other tasks until this task {@link #isDone is * Joins this task, without returning its result or throwing its
* done}. This method may be useful when processing collections
* of tasks when some have been cancelled or otherwise known to
* have aborted.
*
* <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*/
public final void quietlyHelpJoin() {
if (status >= 0) {
ForkJoinWorkerThread w =
(ForkJoinWorkerThread) Thread.currentThread();
if (!w.unpushTask(this) || !tryQuietlyInvoke())
busyJoin(w);
}
}
/**
* Joins this task, without returning its result or throwing an
* exception. This method may be useful when processing * exception. This method may be useful when processing
* collections of tasks when some have been cancelled or otherwise * collections of tasks when some have been cancelled or otherwise
* known to have aborted. * known to have aborted.
*/ */
public final void quietlyJoin() { public final void quietlyJoin() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
if (status >= 0) { if (status >= 0) {
ForkJoinWorkerThread w = getWorker(); if (w.unpushTask(this)) {
if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke()) boolean completed;
awaitDone(w, true); try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
} }
if (completed) {
setCompletion(NORMAL);
return;
}
}
w.joinTask(this);
}
}
else
externalAwaitDone();
} }
/** /**
* Commences performing this task and awaits its completion if * Commences performing this task and awaits its completion if
* necessary, without returning its result or throwing an * necessary, without returning its result or throwing its
* exception. This method may be useful when processing * exception.
* collections of tasks when some have been cancelled or otherwise
* known to have aborted.
*/ */
public final void quietlyInvoke() { public final void quietlyInvoke() {
if (status >= 0 && !tryQuietlyInvoke()) if (status >= 0) {
boolean completed;
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
if (completed)
setCompletion(NORMAL);
else
quietlyJoin(); quietlyJoin();
} }
}
/** /**
* Possibly executes tasks until the pool hosting the current task * Possibly executes tasks until the pool hosting the current task
@ -956,7 +839,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* pre-constructed trees of subtasks in loops. * pre-constructed trees of subtasks in loops.
*/ */
public void reinitialize() { public void reinitialize() {
if ((status & COMPLETION_MASK) == EXCEPTIONAL) if (status == EXCEPTIONAL)
exceptionMap.remove(this); exceptionMap.remove(this);
status = 0; status = 0;
} }
@ -1246,7 +1129,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private static final long serialVersionUID = -7721805057305804111L; private static final long serialVersionUID = -7721805057305804111L;
/** /**
* Saves the state to a stream. * Saves the state to a stream (that is, serializes it).
* *
* @serialData the current run status and the exception thrown * @serialData the current run status and the exception thrown
* during execution, or {@code null} if none * during execution, or {@code null} if none
@ -1259,18 +1142,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} }
/** /**
* Reconstitutes the instance from a stream. * Reconstitutes the instance from a stream (that is, deserializes it).
* *
* @param s the stream * @param s the stream
*/ */
private void readObject(java.io.ObjectInputStream s) private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException { throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject(); s.defaultReadObject();
status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
status |= EXTERNAL_SIGNAL; // conservatively set external signal
Object ex = s.readObject(); Object ex = s.readObject();
if (ex != null) if (ex != null)
setDoneExceptionally((Throwable) ex); setExceptionalCompletion((Throwable) ex);
} }
// Unsafe mechanics // Unsafe mechanics

View File

@ -42,6 +42,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
* An unbounded {@link TransferQueue} based on linked nodes. * An unbounded {@link TransferQueue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect * This queue orders elements FIFO (first-in-first-out) with respect
@ -233,24 +234,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* additional GC bookkeeping ("write barriers") that are sometimes * additional GC bookkeeping ("write barriers") that are sometimes
* more costly than the writes themselves because of contention). * more costly than the writes themselves because of contention).
* *
* Removal of interior nodes (due to timed out or interrupted
* waits, or calls to remove(x) or Iterator.remove) can use a
* scheme roughly similar to that described in Scherer, Lea, and
* Scott's SynchronousQueue. Given a predecessor, we can unsplice
* any node except the (actual) tail of the queue. To avoid
* build-up of cancelled trailing nodes, upon a request to remove
* a trailing node, it is placed in field "cleanMe" to be
* unspliced upon the next call to unsplice any other node.
* Situations needing such mechanics are not common but do occur
* in practice; for example when an unbounded series of short
* timed calls to poll repeatedly time out but never otherwise
* fall off the list because of an untimed call to take at the
* front of the queue. Note that maintaining field cleanMe does
* not otherwise much impact garbage retention even if never
* cleared by some other call because the held node will
* eventually either directly or indirectly lead to a self-link
* once off the list.
*
* *** Overview of implementation *** * *** Overview of implementation ***
* *
* We use a threshold-based approach to updates, with a slack * We use a threshold-based approach to updates, with a slack
@ -266,15 +249,10 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* per-thread one available, but even ThreadLocalRandom is too * per-thread one available, but even ThreadLocalRandom is too
* heavy for these purposes. * heavy for these purposes.
* *
* With such a small slack threshold value, it is rarely * With such a small slack threshold value, it is not worthwhile
* worthwhile to augment this with path short-circuiting; i.e., * to augment this with path short-circuiting (i.e., unsplicing
* unsplicing nodes between head and the first unmatched node, or * interior nodes) except in the case of cancellation/removal (see
* similarly for tail, rather than advancing head or tail * below).
* proper. However, it is used (in awaitMatch) immediately before
* a waiting thread starts to block, as a final bit of helping at
* a point when contention with others is extremely unlikely
* (since if other threads that could release it are operating,
* then the current thread wouldn't be blocking).
* *
* We allow both the head and tail fields to be null before any * We allow both the head and tail fields to be null before any
* nodes are enqueued; initializing upon first append. This * nodes are enqueued; initializing upon first append. This
@ -356,6 +334,70 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* versa) compared to their predecessors receive additional * versa) compared to their predecessors receive additional
* chained spins, reflecting longer paths typically required to * chained spins, reflecting longer paths typically required to
* unblock threads during phase changes. * unblock threads during phase changes.
*
*
* ** Unlinking removed interior nodes **
*
* In addition to minimizing garbage retention via self-linking
* described above, we also unlink removed interior nodes. These
* may arise due to timed out or interrupted waits, or calls to
* remove(x) or Iterator.remove. Normally, given a node that was
* at one time known to be the predecessor of some node s that is
* to be removed, we can unsplice s by CASing the next field of
* its predecessor if it still points to s (otherwise s must
* already have been removed or is now offlist). But there are two
* situations in which we cannot guarantee to make node s
* unreachable in this way: (1) If s is the trailing node of list
* (i.e., with null next), then it is pinned as the target node
* for appends, so can only be removed later after other nodes are
* appended. (2) We cannot necessarily unlink s given a
* predecessor node that is matched (including the case of being
* cancelled): the predecessor may already be unspliced, in which
* case some previous reachable node may still point to s.
* (For further explanation see Herlihy & Shavit "The Art of
* Multiprocessor Programming" chapter 9). Although, in both
* cases, we can rule out the need for further action if either s
* or its predecessor are (or can be made to be) at, or fall off
* from, the head of list.
*
* Without taking these into account, it would be possible for an
* unbounded number of supposedly removed nodes to remain
* reachable. Situations leading to such buildup are uncommon but
* can occur in practice; for example when a series of short timed
* calls to poll repeatedly time out but never otherwise fall off
* the list because of an untimed call to take at the front of the
* queue.
*
* When these cases arise, rather than always retraversing the
* entire list to find an actual predecessor to unlink (which
* won't help for case (1) anyway), we record a conservative
* estimate of possible unsplice failures (in "sweepVotes").
* We trigger a full sweep when the estimate exceeds a threshold
* ("SWEEP_THRESHOLD") indicating the maximum number of estimated
* removal failures to tolerate before sweeping through, unlinking
* cancelled nodes that were not unlinked upon initial removal.
* We perform sweeps by the thread hitting threshold (rather than
* background threads or by spreading work to other threads)
* because in the main contexts in which removal occurs, the
* caller is already timed-out, cancelled, or performing a
* potentially O(n) operation (e.g. remove(x)), none of which are
* time-critical enough to warrant the overhead that alternatives
* would impose on other threads.
*
* Because the sweepVotes estimate is conservative, and because
* nodes become unlinked "naturally" as they fall off the head of
* the queue, and because we allow votes to accumulate even while
* sweeps are in progress, there are typically significantly fewer
* such nodes than estimated. Choice of a threshold value
* balances the likelihood of wasted effort and contention, versus
* providing a worst-case bound on retention of interior nodes in
* quiescent queues. The value defined below was chosen
* empirically to balance these under various timeout scenarios.
*
* Note that we cannot self-link unlinked interior nodes during
* sweeps. However, the associated garbage chains terminate when
* some successor ultimately falls off the head of the list and is
* self-linked.
*/ */
/** True if on multiprocessor */ /** True if on multiprocessor */
@ -381,12 +423,20 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
*/ */
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
* The maximum number of estimated removal failures (sweepVotes)
* to tolerate before sweeping through the queue unlinking
* cancelled nodes that were not unlinked upon initial
* removal. See above for explanation. The value must be at least
* two to avoid useless sweeps when removing trailing nodes.
*/
static final int SWEEP_THRESHOLD = 32;
/** /**
* Queue nodes. Uses Object, not E, for items to allow forgetting * Queue nodes. Uses Object, not E, for items to allow forgetting
* them after use. Relies heavily on Unsafe mechanics to minimize * them after use. Relies heavily on Unsafe mechanics to minimize
* unnecessary ordering constraints: Writes that intrinsically * unnecessary ordering constraints: Writes that are intrinsically
* precede or follow CASes use simple relaxed forms. Other * ordered wrt other accesses or CASes use simple relaxed forms.
* cleanups use releasing/lazy writes.
*/ */
static final class Node { static final class Node {
final boolean isData; // false if this is a request node final boolean isData; // false if this is a request node
@ -405,8 +455,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
/** /**
* Creates a new node. Uses relaxed write because item can only * Constructs a new node. Uses relaxed write because item can
* be seen if followed by CAS. * only be seen after publication via casNext.
*/ */
Node(Object item, boolean isData) { Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write UNSAFE.putObject(this, itemOffset, item); // relaxed write
@ -422,13 +472,17 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
/** /**
* Sets item to self (using a releasing/lazy write) and waiter * Sets item to self and waiter to null, to avoid garbage
* to null, to avoid garbage retention after extracting or * retention after matching or cancelling. Uses relaxed writes
* cancelling. * because order is already constrained in the only calling
* contexts: item is forgotten only after volatile/atomic
* mechanics that extract items. Similarly, clearing waiter
* follows either CAS or return from park (if ever parked;
* else we don't care).
*/ */
final void forgetContents() { final void forgetContents() {
UNSAFE.putOrderedObject(this, itemOffset, this); UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putOrderedObject(this, waiterOffset, null); UNSAFE.putObject(this, waiterOffset, null);
} }
/** /**
@ -486,12 +540,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
/** head of the queue; null until first enqueue */ /** head of the queue; null until first enqueue */
transient volatile Node head; transient volatile Node head;
/** predecessor of dangling unspliceable node */
private transient volatile Node cleanMe; // decl here reduces contention
/** tail of the queue; null until first append */ /** tail of the queue; null until first append */
private transient volatile Node tail; private transient volatile Node tail;
/** The number of apparent failures to unsplice removed nodes */
private transient volatile int sweepVotes;
// CAS methods for fields // CAS methods for fields
private boolean casTail(Node cmp, Node val) { private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
@ -501,8 +555,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
} }
private boolean casCleanMe(Node cmp, Node val) { private boolean casSweepVotes(int cmp, int val) {
return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
} }
/* /*
@ -544,10 +598,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
break; break;
if (p.casItem(item, e)) { // match if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) { for (Node q = p; q != h;) {
Node n = q.next; // update head by 2 Node n = q.next; // update by 2 unless singleton
if (n != null) // unless singleton if (head == h && casHead(h, n == null? q : n)) {
q = n;
if (head == h && casHead(h, q)) {
h.forgetNext(); h.forgetNext();
break; break;
} // advance and retry } // advance and retry
@ -647,9 +699,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
randomYields = ThreadLocalRandom.current(); randomYields = ThreadLocalRandom.current();
} }
else if (spins > 0) { // spin else if (spins > 0) { // spin
if (--spins == 0) --spins;
shortenHeadPath(); // reduce slack before blocking if (randomYields.nextInt(CHAINED_SPINS) == 0)
else if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield Thread.yield(); // occasionally yield
} }
else if (s.waiter == null) { else if (s.waiter == null) {
@ -663,8 +714,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
else { else {
LockSupport.park(this); LockSupport.park(this);
s.waiter = null;
spins = -1; // spin if front upon wakeup
} }
} }
} }
@ -685,27 +734,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
return 0; return 0;
} }
/**
* Tries (once) to unsplice nodes between head and first unmatched
* or trailing node; failing on contention.
*/
private void shortenHeadPath() {
Node h, hn, p, q;
if ((p = h = head) != null && h.isMatched() &&
(q = hn = h.next) != null) {
Node n;
while ((n = q.next) != q) {
if (n == null || !q.isMatched()) {
if (hn != q && h.next == hn)
h.casNext(hn, q);
break;
}
p = q;
q = n;
}
}
}
/* -------------- Traversal methods -------------- */ /* -------------- Traversal methods -------------- */
/** /**
@ -818,7 +846,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
public final void remove() { public final void remove() {
Node p = lastRet; Node p = lastRet;
if (p == null) throw new IllegalStateException(); if (p == null) throw new IllegalStateException();
findAndRemoveDataNode(lastPred, p); if (p.tryMatchData())
unsplice(lastPred, p);
} }
} }
@ -828,99 +857,68 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Unsplices (now or later) the given deleted/cancelled node with * Unsplices (now or later) the given deleted/cancelled node with
* the given predecessor. * the given predecessor.
* *
* @param pred predecessor of node to be unspliced * @param pred a node that was at one time known to be the
* predecessor of s, or null or s itself if s is/was at head
* @param s the node to be unspliced * @param s the node to be unspliced
*/ */
private void unsplice(Node pred, Node s) { final void unsplice(Node pred, Node s) {
s.forgetContents(); // clear unneeded fields s.forgetContents(); // forget unneeded fields
/* /*
* At any given time, exactly one node on list cannot be * See above for rationale. Briefly: if pred still points to
* unlinked -- the last inserted node. To accommodate this, if * s, try to unlink s. If s cannot be unlinked, because it is
* we cannot unlink s, we save its predecessor as "cleanMe", * trailing node or pred might be unlinked, and neither pred
* processing the previously saved version first. Because only * nor s are head or offlist, add to sweepVotes, and if enough
* one node in the list can have a null next, at least one of * votes have accumulated, sweep.
* node s or the node previously saved can always be
* processed, so this always terminates.
*/ */
if (pred != null && pred != s) { if (pred != null && pred != s && pred.next == s) {
while (pred.next == s) {
Node oldpred = (cleanMe == null) ? null : reclean();
Node n = s.next; Node n = s.next;
if (n != null) { if (n == null ||
if (n != s) (n != s && pred.casNext(s, n) && pred.isMatched())) {
pred.casNext(s, n); for (;;) { // check if at, or could be, head
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
if (!h.isMatched())
break;
Node hn = h.next;
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))
h.forgetNext(); // advance head
}
if (pred.next != pred && s.next != s) { // recheck if offlist
for (;;) { // sweep now if enough votes
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break; break;
} }
if (oldpred == pred || // Already saved else if (casSweepVotes(v, 0)) {
((oldpred == null || oldpred.next == s) && sweep();
casCleanMe(oldpred, pred))) {
break; break;
} }
} }
} }
} }
}
}
/** /**
* Tries to unsplice the deleted/cancelled node held in cleanMe * Unlinks matched (typically cancelled) nodes encountered in a
* that was previously uncleanable because it was at tail. * traversal from head.
*
* @return current cleanMe node (or null)
*/ */
private Node reclean() { private void sweep() {
/* for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
* cleanMe is, or at one time was, predecessor of a cancelled if (!s.isMatched())
* node s that was the tail so could not be unspliced. If it // Unmatched nodes are never self-linked
* is no longer the tail, try to unsplice if necessary and p = s;
* make cleanMe slot available. This differs from similar else if ((n = s.next) == null) // trailing node is pinned
* code in unsplice() because we must check that pred still
* points to a matched node that can be unspliced -- if not,
* we can (must) clear cleanMe without unsplicing. This can
* loop only due to contention.
*/
Node pred;
while ((pred = cleanMe) != null) {
Node s = pred.next;
Node n;
if (s == null || s == pred || !s.isMatched())
casCleanMe(pred, null); // already gone
else if ((n = s.next) != null) {
if (n != s)
pred.casNext(s, n);
casCleanMe(pred, null);
}
else
break; break;
} else if (s == n) // stale
return pred; // No need to also check for p == s, since that implies s == n
}
/**
* Main implementation of Iterator.remove(). Finds
* and unsplices the given data node.
*
* @param possiblePred possible predecessor of s
* @param s the node to remove
*/
final void findAndRemoveDataNode(Node possiblePred, Node s) {
// assert s.isData;
if (s.tryMatchData()) {
if (possiblePred != null && possiblePred.next == s)
unsplice(possiblePred, s); // was actual predecessor
else {
for (Node pred = null, p = head; p != null; ) {
if (p == s) {
unsplice(pred, p);
break;
}
if (p.isUnmatchedRequest())
break;
pred = p;
if ((p = p.next) == pred) { // stale
pred = null;
p = head; p = head;
} else
} p.casNext(s, n);
}
} }
} }
@ -1158,7 +1156,11 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* @return {@code true} if this queue contains no elements * @return {@code true} if this queue contains no elements
*/ */
public boolean isEmpty() { public boolean isEmpty() {
return firstOfMode(true) == null; for (Node p = head; p != null; p = succ(p)) {
if (!p.isMatched())
return !p.isData;
}
return true;
} }
public boolean hasWaitingConsumer() { public boolean hasWaitingConsumer() {
@ -1252,8 +1254,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
private static final long tailOffset = private static final long tailOffset =
objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
private static final long cleanMeOffset = private static final long sweepVotesOffset =
objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
static long objectFieldOffset(sun.misc.Unsafe UNSAFE, static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) { String field, Class<?> klazz) {
@ -1266,5 +1268,4 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
throw error; throw error;
} }
} }
} }

View File

@ -898,7 +898,7 @@ public class Phaser {
boolean doWait() { boolean doWait() {
if (thread != null) { if (thread != null) {
try { try {
ForkJoinPool.managedBlock(this, false); ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
} }
} }

View File

@ -206,7 +206,7 @@ public final class Integrate {
q.fork(); q.fork();
ar = recEval(c, r, fc, fr, ar); ar = recEval(c, r, fc, fr, ar);
if (!q.tryUnfork()) { if (!q.tryUnfork()) {
q.quietlyHelpJoin(); q.quietlyJoin();
return ar + q.area; return ar + q.area;
} }
return ar + recEval(l, c, fl, fc, al); return ar + recEval(l, c, fl, fc, al);
@ -254,7 +254,7 @@ public final class Integrate {
(q = new DQuad(l, c, al)).fork(); (q = new DQuad(l, c, al)).fork();
ar = recEval(c, r, fc, fr, ar); ar = recEval(c, r, fc, fr, ar);
if (q != null && !q.tryUnfork()) { if (q != null && !q.tryUnfork()) {
q.quietlyHelpJoin(); q.quietlyJoin();
return ar + q.area; return ar + q.area;
} }
return ar + recEval(l, c, fl, fc, al); return ar + recEval(l, c, fl, fc, al);