8177632: ScheduledThreadPoolExecutor delayed task shutdown policy affects non-scheduled tasks
8176254: ScheduledThreadPoolExecutor periodic tasks not cancelled if running at shutdown 8173113: Javadoc for ThreadPoolExecutor is unclear wrt corePoolSize and running threads Reviewed-by: martin, psandoz, dholmes
This commit is contained in:
parent
8e1db99fd6
commit
c887543d2d
@ -80,7 +80,7 @@ package java.util.concurrent;
|
||||
* Runnable beeper = () -> System.out.println("beep");
|
||||
* ScheduledFuture<?> beeperHandle =
|
||||
* scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
|
||||
* Runnable canceller = () -> beeperHandle.cancel(true);
|
||||
* Runnable canceller = () -> beeperHandle.cancel(false);
|
||||
* scheduler.schedule(canceller, 1, HOURS);
|
||||
* }
|
||||
* }}</pre>
|
||||
@ -91,8 +91,7 @@ package java.util.concurrent;
|
||||
public interface ScheduledExecutorService extends ExecutorService {
|
||||
|
||||
/**
|
||||
* Creates and executes a one-shot action that becomes enabled
|
||||
* after the given delay.
|
||||
* Submits a one-shot task that becomes enabled after the given delay.
|
||||
*
|
||||
* @param command the task to execute
|
||||
* @param delay the time from now to delay execution
|
||||
@ -102,14 +101,14 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
* {@code null} upon completion
|
||||
* @throws RejectedExecutionException if the task cannot be
|
||||
* scheduled for execution
|
||||
* @throws NullPointerException if command is null
|
||||
* @throws NullPointerException if command or unit is null
|
||||
*/
|
||||
public ScheduledFuture<?> schedule(Runnable command,
|
||||
long delay, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Creates and executes a ScheduledFuture that becomes enabled after the
|
||||
* given delay.
|
||||
* Submits a value-returning one-shot task that becomes enabled
|
||||
* after the given delay.
|
||||
*
|
||||
* @param callable the function to execute
|
||||
* @param delay the time from now to delay execution
|
||||
@ -118,15 +117,15 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
* @return a ScheduledFuture that can be used to extract result or cancel
|
||||
* @throws RejectedExecutionException if the task cannot be
|
||||
* scheduled for execution
|
||||
* @throws NullPointerException if callable is null
|
||||
* @throws NullPointerException if callable or unit is null
|
||||
*/
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
|
||||
long delay, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Creates and executes a periodic action that becomes enabled first
|
||||
* after the given initial delay, and subsequently with the given
|
||||
* period; that is, executions will commence after
|
||||
* Submits a periodic action that becomes enabled first after the
|
||||
* given initial delay, and subsequently with the given period;
|
||||
* that is, executions will commence after
|
||||
* {@code initialDelay}, then {@code initialDelay + period}, then
|
||||
* {@code initialDelay + 2 * period}, and so on.
|
||||
*
|
||||
@ -137,8 +136,8 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
* via the returned future.
|
||||
* <li>The executor terminates, also resulting in task cancellation.
|
||||
* <li>An execution of the task throws an exception. In this case
|
||||
* calling {@link Future#get() get} on the returned future will
|
||||
* throw {@link ExecutionException}.
|
||||
* calling {@link Future#get() get} on the returned future will throw
|
||||
* {@link ExecutionException}, holding the exception as its cause.
|
||||
* </ul>
|
||||
* Subsequent executions are suppressed. Subsequent calls to
|
||||
* {@link Future#isDone isDone()} on the returned future will
|
||||
@ -159,7 +158,7 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
* abnormal termination of a task execution.
|
||||
* @throws RejectedExecutionException if the task cannot be
|
||||
* scheduled for execution
|
||||
* @throws NullPointerException if command is null
|
||||
* @throws NullPointerException if command or unit is null
|
||||
* @throws IllegalArgumentException if period less than or equal to zero
|
||||
*/
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
|
||||
@ -168,10 +167,10 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Creates and executes a periodic action that becomes enabled first
|
||||
* after the given initial delay, and subsequently with the
|
||||
* given delay between the termination of one execution and the
|
||||
* commencement of the next.
|
||||
* Submits a periodic action that becomes enabled first after the
|
||||
* given initial delay, and subsequently with the given delay
|
||||
* between the termination of one execution and the commencement of
|
||||
* the next.
|
||||
*
|
||||
* <p>The sequence of task executions continues indefinitely until
|
||||
* one of the following exceptional completions occur:
|
||||
@ -180,8 +179,8 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
* via the returned future.
|
||||
* <li>The executor terminates, also resulting in task cancellation.
|
||||
* <li>An execution of the task throws an exception. In this case
|
||||
* calling {@link Future#get() get} on the returned future will
|
||||
* throw {@link ExecutionException}.
|
||||
* calling {@link Future#get() get} on the returned future will throw
|
||||
* {@link ExecutionException}, holding the exception as its cause.
|
||||
* </ul>
|
||||
* Subsequent executions are suppressed. Subsequent calls to
|
||||
* {@link Future#isDone isDone()} on the returned future will
|
||||
@ -199,7 +198,7 @@ public interface ScheduledExecutorService extends ExecutorService {
|
||||
* abnormal termination of a task execution.
|
||||
* @throws RejectedExecutionException if the task cannot be
|
||||
* scheduled for execution
|
||||
* @throws NullPointerException if command is null
|
||||
* @throws NullPointerException if command or unit is null
|
||||
* @throws IllegalArgumentException if delay less than or equal to zero
|
||||
*/
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
|
||||
|
@ -44,6 +44,7 @@ import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@ -87,6 +88,11 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
* use {@code allowCoreThreadTimeOut} because this may leave the pool
|
||||
* without threads to handle tasks once they become eligible to run.
|
||||
*
|
||||
* <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
|
||||
* this class uses {@link Executors#defaultThreadFactory} as the
|
||||
* default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
|
||||
* as the default rejected execution handler.
|
||||
*
|
||||
* <p><b>Extension notes:</b> This class overrides the
|
||||
* {@link ThreadPoolExecutor#execute(Runnable) execute} and
|
||||
* {@link AbstractExecutorService#submit(Runnable) submit}
|
||||
@ -161,7 +167,7 @@ public class ScheduledThreadPoolExecutor
|
||||
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
|
||||
|
||||
/**
|
||||
* False if should cancel non-periodic tasks on shutdown.
|
||||
* False if should cancel non-periodic not-yet-expired tasks on shutdown.
|
||||
*/
|
||||
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
|
||||
|
||||
@ -292,10 +298,9 @@ public class ScheduledThreadPoolExecutor
|
||||
* Overrides FutureTask version so as to reset/requeue if periodic.
|
||||
*/
|
||||
public void run() {
|
||||
boolean periodic = isPeriodic();
|
||||
if (!canRunInCurrentRunState(periodic))
|
||||
if (!canRunInCurrentRunState(this))
|
||||
cancel(false);
|
||||
else if (!periodic)
|
||||
else if (!isPeriodic())
|
||||
super.run();
|
||||
else if (super.runAndReset()) {
|
||||
setNextRunTime();
|
||||
@ -305,15 +310,18 @@ public class ScheduledThreadPoolExecutor
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if can run a task given current run state
|
||||
* and run-after-shutdown parameters.
|
||||
*
|
||||
* @param periodic true if this task periodic, false if delayed
|
||||
* Returns true if can run a task given current run state and
|
||||
* run-after-shutdown parameters.
|
||||
*/
|
||||
boolean canRunInCurrentRunState(boolean periodic) {
|
||||
return isRunningOrShutdown(periodic ?
|
||||
continueExistingPeriodicTasksAfterShutdown :
|
||||
executeExistingDelayedTasksAfterShutdown);
|
||||
boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
|
||||
if (!isShutdown())
|
||||
return true;
|
||||
if (isStopped())
|
||||
return false;
|
||||
return task.isPeriodic()
|
||||
? continueExistingPeriodicTasksAfterShutdown
|
||||
: (executeExistingDelayedTasksAfterShutdown
|
||||
|| task.getDelay(NANOSECONDS) <= 0);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -332,9 +340,7 @@ public class ScheduledThreadPoolExecutor
|
||||
reject(task);
|
||||
else {
|
||||
super.getQueue().add(task);
|
||||
if (isShutdown() &&
|
||||
!canRunInCurrentRunState(task.isPeriodic()) &&
|
||||
remove(task))
|
||||
if (!canRunInCurrentRunState(task) && remove(task))
|
||||
task.cancel(false);
|
||||
else
|
||||
ensurePrestart();
|
||||
@ -348,13 +354,14 @@ public class ScheduledThreadPoolExecutor
|
||||
* @param task the task
|
||||
*/
|
||||
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
|
||||
if (canRunInCurrentRunState(true)) {
|
||||
if (canRunInCurrentRunState(task)) {
|
||||
super.getQueue().add(task);
|
||||
if (!canRunInCurrentRunState(true) && remove(task))
|
||||
task.cancel(false);
|
||||
else
|
||||
if (canRunInCurrentRunState(task) || !remove(task)) {
|
||||
ensurePrestart();
|
||||
return;
|
||||
}
|
||||
}
|
||||
task.cancel(false);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -367,23 +374,18 @@ public class ScheduledThreadPoolExecutor
|
||||
getExecuteExistingDelayedTasksAfterShutdownPolicy();
|
||||
boolean keepPeriodic =
|
||||
getContinueExistingPeriodicTasksAfterShutdownPolicy();
|
||||
if (!keepDelayed && !keepPeriodic) {
|
||||
for (Object e : q.toArray())
|
||||
if (e instanceof RunnableScheduledFuture<?>)
|
||||
((RunnableScheduledFuture<?>) e).cancel(false);
|
||||
q.clear();
|
||||
}
|
||||
else {
|
||||
// Traverse snapshot to avoid iterator exceptions
|
||||
for (Object e : q.toArray()) {
|
||||
if (e instanceof RunnableScheduledFuture) {
|
||||
RunnableScheduledFuture<?> t =
|
||||
(RunnableScheduledFuture<?>)e;
|
||||
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
|
||||
t.isCancelled()) { // also remove if already cancelled
|
||||
if (q.remove(t))
|
||||
t.cancel(false);
|
||||
}
|
||||
// Traverse snapshot to avoid iterator exceptions
|
||||
// TODO: implement and use efficient removeIf
|
||||
// super.getQueue().removeIf(...);
|
||||
for (Object e : q.toArray()) {
|
||||
if (e instanceof RunnableScheduledFuture) {
|
||||
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
|
||||
if ((t.isPeriodic()
|
||||
? !keepPeriodic
|
||||
: (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
|
||||
|| t.isCancelled()) { // also remove if already cancelled
|
||||
if (q.remove(t))
|
||||
t.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -579,6 +581,34 @@ public class ScheduledThreadPoolExecutor
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits a periodic action that becomes enabled first after the
|
||||
* given initial delay, and subsequently with the given period;
|
||||
* that is, executions will commence after
|
||||
* {@code initialDelay}, then {@code initialDelay + period}, then
|
||||
* {@code initialDelay + 2 * period}, and so on.
|
||||
*
|
||||
* <p>The sequence of task executions continues indefinitely until
|
||||
* one of the following exceptional completions occur:
|
||||
* <ul>
|
||||
* <li>The task is {@linkplain Future#cancel explicitly cancelled}
|
||||
* via the returned future.
|
||||
* <li>Method {@link #shutdown} is called and the {@linkplain
|
||||
* #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
|
||||
* whether to continue after shutdown} is not set true, or method
|
||||
* {@link #shutdownNow} is called; also resulting in task
|
||||
* cancellation.
|
||||
* <li>An execution of the task throws an exception. In this case
|
||||
* calling {@link Future#get() get} on the returned future will throw
|
||||
* {@link ExecutionException}, holding the exception as its cause.
|
||||
* </ul>
|
||||
* Subsequent executions are suppressed. Subsequent calls to
|
||||
* {@link Future#isDone isDone()} on the returned future will
|
||||
* return {@code true}.
|
||||
*
|
||||
* <p>If any execution of this task takes longer than its period, then
|
||||
* subsequent executions may start late, but will not concurrently
|
||||
* execute.
|
||||
*
|
||||
* @throws RejectedExecutionException {@inheritDoc}
|
||||
* @throws NullPointerException {@inheritDoc}
|
||||
* @throws IllegalArgumentException {@inheritDoc}
|
||||
@ -604,6 +634,29 @@ public class ScheduledThreadPoolExecutor
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits a periodic action that becomes enabled first after the
|
||||
* given initial delay, and subsequently with the given delay
|
||||
* between the termination of one execution and the commencement of
|
||||
* the next.
|
||||
*
|
||||
* <p>The sequence of task executions continues indefinitely until
|
||||
* one of the following exceptional completions occur:
|
||||
* <ul>
|
||||
* <li>The task is {@linkplain Future#cancel explicitly cancelled}
|
||||
* via the returned future.
|
||||
* <li>Method {@link #shutdown} is called and the {@linkplain
|
||||
* #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
|
||||
* whether to continue after shutdown} is not set true, or method
|
||||
* {@link #shutdownNow} is called; also resulting in task
|
||||
* cancellation.
|
||||
* <li>An execution of the task throws an exception. In this case
|
||||
* calling {@link Future#get() get} on the returned future will throw
|
||||
* {@link ExecutionException}, holding the exception as its cause.
|
||||
* </ul>
|
||||
* Subsequent executions are suppressed. Subsequent calls to
|
||||
* {@link Future#isDone isDone()} on the returned future will
|
||||
* return {@code true}.
|
||||
*
|
||||
* @throws RejectedExecutionException {@inheritDoc}
|
||||
* @throws NullPointerException {@inheritDoc}
|
||||
* @throws IllegalArgumentException {@inheritDoc}
|
||||
@ -681,9 +734,8 @@ public class ScheduledThreadPoolExecutor
|
||||
/**
|
||||
* Sets the policy on whether to continue executing existing
|
||||
* periodic tasks even when this executor has been {@code shutdown}.
|
||||
* In this case, these tasks will only terminate upon
|
||||
* {@code shutdownNow} or after setting the policy to
|
||||
* {@code false} when already shutdown.
|
||||
* In this case, executions will continue until {@code shutdownNow}
|
||||
* or the policy is set to {@code false} when already shutdown.
|
||||
* This value is by default {@code false}.
|
||||
*
|
||||
* @param value if {@code true}, continue after shutdown, else don't
|
||||
@ -698,9 +750,8 @@ public class ScheduledThreadPoolExecutor
|
||||
/**
|
||||
* Gets the policy on whether to continue executing existing
|
||||
* periodic tasks even when this executor has been {@code shutdown}.
|
||||
* In this case, these tasks will only terminate upon
|
||||
* {@code shutdownNow} or after setting the policy to
|
||||
* {@code false} when already shutdown.
|
||||
* In this case, executions will continue until {@code shutdownNow}
|
||||
* or the policy is set to {@code false} when already shutdown.
|
||||
* This value is by default {@code false}.
|
||||
*
|
||||
* @return {@code true} if will continue after shutdown
|
||||
@ -904,7 +955,7 @@ public class ScheduledThreadPoolExecutor
|
||||
/**
|
||||
* Sets f's heapIndex if it is a ScheduledFutureTask.
|
||||
*/
|
||||
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
|
||||
private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
|
||||
if (f instanceof ScheduledFutureTask)
|
||||
((ScheduledFutureTask)f).heapIndex = idx;
|
||||
}
|
||||
@ -1202,41 +1253,12 @@ public class ScheduledThreadPoolExecutor
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns first element only if it is expired.
|
||||
* Used only by drainTo. Call only when holding lock.
|
||||
*/
|
||||
private RunnableScheduledFuture<?> peekExpired() {
|
||||
// assert lock.isHeldByCurrentThread();
|
||||
RunnableScheduledFuture<?> first = queue[0];
|
||||
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
|
||||
null : first;
|
||||
}
|
||||
|
||||
public int drainTo(Collection<? super Runnable> c) {
|
||||
if (c == null)
|
||||
throw new NullPointerException();
|
||||
if (c == this)
|
||||
throw new IllegalArgumentException();
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
RunnableScheduledFuture<?> first;
|
||||
int n = 0;
|
||||
while ((first = peekExpired()) != null) {
|
||||
c.add(first); // In this order, in case add() throws.
|
||||
finishPoll(first);
|
||||
++n;
|
||||
}
|
||||
return n;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return drainTo(c, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public int drainTo(Collection<? super Runnable> c, int maxElements) {
|
||||
if (c == null)
|
||||
throw new NullPointerException();
|
||||
Objects.requireNonNull(c);
|
||||
if (c == this)
|
||||
throw new IllegalArgumentException();
|
||||
if (maxElements <= 0)
|
||||
@ -1244,9 +1266,11 @@ public class ScheduledThreadPoolExecutor
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
RunnableScheduledFuture<?> first;
|
||||
int n = 0;
|
||||
while (n < maxElements && (first = peekExpired()) != null) {
|
||||
for (RunnableScheduledFuture<?> first;
|
||||
n < maxElements
|
||||
&& (first = queue[0]) != null
|
||||
&& first.getDelay(NANOSECONDS) <= 0;) {
|
||||
c.add(first); // In this order, in case add() throws.
|
||||
finishPoll(first);
|
||||
++n;
|
||||
@ -1284,7 +1308,13 @@ public class ScheduledThreadPoolExecutor
|
||||
}
|
||||
|
||||
public Iterator<Runnable> iterator() {
|
||||
return new Itr(Arrays.copyOf(queue, size));
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
return new Itr(Arrays.copyOf(queue, size));
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,31 +74,28 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Core and maximum pool sizes</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* A {@code ThreadPoolExecutor} will automatically adjust the
|
||||
* <dd>A {@code ThreadPoolExecutor} will automatically adjust the
|
||||
* pool size (see {@link #getPoolSize})
|
||||
* according to the bounds set by
|
||||
* corePoolSize (see {@link #getCorePoolSize}) and
|
||||
* maximumPoolSize (see {@link #getMaximumPoolSize}).
|
||||
*
|
||||
* When a new task is submitted in method {@link #execute(Runnable)},
|
||||
* and fewer than corePoolSize threads are running, a new thread is
|
||||
* if fewer than corePoolSize threads are running, a new thread is
|
||||
* created to handle the request, even if other worker threads are
|
||||
* idle. If there are more than corePoolSize but less than
|
||||
* maximumPoolSize threads running, a new thread will be created only
|
||||
* if the queue is full. By setting corePoolSize and maximumPoolSize
|
||||
* the same, you create a fixed-size thread pool. By setting
|
||||
* maximumPoolSize to an essentially unbounded value such as {@code
|
||||
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
|
||||
* number of concurrent tasks. Most typically, core and maximum pool
|
||||
* sizes are set only upon construction, but they may also be changed
|
||||
* dynamically using {@link #setCorePoolSize} and {@link
|
||||
* #setMaximumPoolSize}. </dd>
|
||||
* idle. Else if fewer than maximumPoolSize threads are running, a
|
||||
* new thread will be created to handle the request only if the queue
|
||||
* is full. By setting corePoolSize and maximumPoolSize the same, you
|
||||
* create a fixed-size thread pool. By setting maximumPoolSize to an
|
||||
* essentially unbounded value such as {@code Integer.MAX_VALUE}, you
|
||||
* allow the pool to accommodate an arbitrary number of concurrent
|
||||
* tasks. Most typically, core and maximum pool sizes are set only
|
||||
* upon construction, but they may also be changed dynamically using
|
||||
* {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
|
||||
*
|
||||
* <dt>On-demand construction</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* By default, even core threads are initially created and
|
||||
* <dd>By default, even core threads are initially created and
|
||||
* started only when new tasks arrive, but this can be overridden
|
||||
* dynamically using method {@link #prestartCoreThread} or {@link
|
||||
* #prestartAllCoreThreads}. You probably want to prestart threads if
|
||||
@ -106,8 +103,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Creating new threads</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* New threads are created using a {@link ThreadFactory}. If not
|
||||
* <dd>New threads are created using a {@link ThreadFactory}. If not
|
||||
* otherwise specified, a {@link Executors#defaultThreadFactory} is
|
||||
* used, that creates threads to all be in the same {@link
|
||||
* ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
|
||||
@ -124,8 +120,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Keep-alive times</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* If the pool currently has more than corePoolSize threads,
|
||||
* <dd>If the pool currently has more than corePoolSize threads,
|
||||
* excess threads will be terminated if they have been idle for more
|
||||
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
|
||||
* This provides a means of reducing resource consumption when the
|
||||
@ -142,8 +137,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Queuing</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* Any {@link BlockingQueue} may be used to transfer and hold
|
||||
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
|
||||
* submitted tasks. The use of this queue interacts with pool sizing:
|
||||
*
|
||||
* <ul>
|
||||
@ -208,8 +202,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Rejected tasks</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* New tasks submitted in method {@link #execute(Runnable)} will be
|
||||
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
|
||||
* <em>rejected</em> when the Executor has been shut down, and also when
|
||||
* the Executor uses finite bounds for both maximum threads and work queue
|
||||
* capacity, and is saturated. In either case, the {@code execute} method
|
||||
@ -220,9 +213,8 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <ol>
|
||||
*
|
||||
* <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the
|
||||
* handler throws a runtime {@link RejectedExecutionException} upon
|
||||
* rejection.
|
||||
* <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler
|
||||
* throws a runtime {@link RejectedExecutionException} upon rejection.
|
||||
*
|
||||
* <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
|
||||
* that invokes {@code execute} itself runs the task. This provides a
|
||||
@ -246,8 +238,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Hook methods</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* This class provides {@code protected} overridable
|
||||
* <dd>This class provides {@code protected} overridable
|
||||
* {@link #beforeExecute(Thread, Runnable)} and
|
||||
* {@link #afterExecute(Runnable, Throwable)} methods that are called
|
||||
* before and after execution of each task. These can be used to
|
||||
@ -263,8 +254,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Queue maintenance</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* Method {@link #getQueue()} allows access to the work queue
|
||||
* <dd>Method {@link #getQueue()} allows access to the work queue
|
||||
* for purposes of monitoring and debugging. Use of this method for
|
||||
* any other purpose is strongly discouraged. Two supplied methods,
|
||||
* {@link #remove(Runnable)} and {@link #purge} are available to
|
||||
@ -273,8 +263,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*
|
||||
* <dt>Finalization</dt>
|
||||
*
|
||||
* <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
|
||||
* A pool that is no longer referenced in a program <em>AND</em>
|
||||
* <dd>A pool that is no longer referenced in a program <em>AND</em>
|
||||
* has no remaining threads will be {@code shutdown} automatically. If
|
||||
* you would like to ensure that unreferenced pools are reclaimed even
|
||||
* if users forget to call {@link #shutdown}, then you must arrange
|
||||
@ -850,17 +839,6 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
void onShutdown() {
|
||||
}
|
||||
|
||||
/**
|
||||
* State check needed by ScheduledThreadPoolExecutor to
|
||||
* enable running tasks during shutdown.
|
||||
*
|
||||
* @param shutdownOK true if should return true if SHUTDOWN
|
||||
*/
|
||||
final boolean isRunningOrShutdown(boolean shutdownOK) {
|
||||
int rs = runStateOf(ctl.get());
|
||||
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
|
||||
}
|
||||
|
||||
/**
|
||||
* Drains the task queue into a new list, normally using
|
||||
* drainTo. But if the queue is a DelayQueue or any other kind of
|
||||
@ -1184,9 +1162,11 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
|
||||
/**
|
||||
* Creates a new {@code ThreadPoolExecutor} with the given initial
|
||||
* parameters and default thread factory and rejected execution handler.
|
||||
* It may be more convenient to use one of the {@link Executors} factory
|
||||
* methods instead of this general purpose constructor.
|
||||
* parameters, the default thread factory and the default rejected
|
||||
* execution handler.
|
||||
*
|
||||
* <p>It may be more convenient to use one of the {@link Executors}
|
||||
* factory methods instead of this general purpose constructor.
|
||||
*
|
||||
* @param corePoolSize the number of threads to keep in the pool, even
|
||||
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
|
||||
@ -1217,7 +1197,8 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
|
||||
/**
|
||||
* Creates a new {@code ThreadPoolExecutor} with the given initial
|
||||
* parameters and default rejected execution handler.
|
||||
* parameters and {@linkplain ThreadPoolExecutor.AbortPolicy
|
||||
* default rejected execution handler}.
|
||||
*
|
||||
* @param corePoolSize the number of threads to keep in the pool, even
|
||||
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
|
||||
@ -1252,7 +1233,8 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
|
||||
/**
|
||||
* Creates a new {@code ThreadPoolExecutor} with the given initial
|
||||
* parameters and default thread factory.
|
||||
* parameters and
|
||||
* {@linkplain Executors#defaultThreadFactory default thread factory}.
|
||||
*
|
||||
* @param corePoolSize the number of threads to keep in the pool, even
|
||||
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
|
||||
@ -1450,6 +1432,11 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
return ! isRunning(ctl.get());
|
||||
}
|
||||
|
||||
/** Used by ScheduledThreadPoolExecutor. */
|
||||
boolean isStopped() {
|
||||
return runStateAtLeast(ctl.get(), STOP);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this executor is in the process of terminating
|
||||
* after {@link #shutdown} or {@link #shutdownNow} but has not
|
||||
@ -2065,7 +2052,10 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
|
||||
|
||||
/**
|
||||
* A handler for rejected tasks that throws a
|
||||
* {@code RejectedExecutionException}.
|
||||
* {@link RejectedExecutionException}.
|
||||
*
|
||||
* This is the default handler for {@link ThreadPoolExecutor} and
|
||||
* {@link ScheduledThreadPoolExecutor}.
|
||||
*/
|
||||
public static class AbortPolicy implements RejectedExecutionHandler {
|
||||
/**
|
||||
|
@ -103,18 +103,24 @@ import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.RecursiveAction;
|
||||
import java.util.concurrent.RecursiveTask;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -665,6 +671,33 @@ public class JSR166TestCase extends TestCase {
|
||||
public static long MEDIUM_DELAY_MS;
|
||||
public static long LONG_DELAY_MS;
|
||||
|
||||
private static final long RANDOM_TIMEOUT;
|
||||
private static final long RANDOM_EXPIRED_TIMEOUT;
|
||||
private static final TimeUnit RANDOM_TIMEUNIT;
|
||||
static {
|
||||
ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||
long[] timeouts = { Long.MIN_VALUE, -1, 0, 1, Long.MAX_VALUE };
|
||||
RANDOM_TIMEOUT = timeouts[rnd.nextInt(timeouts.length)];
|
||||
RANDOM_EXPIRED_TIMEOUT = timeouts[rnd.nextInt(3)];
|
||||
TimeUnit[] timeUnits = TimeUnit.values();
|
||||
RANDOM_TIMEUNIT = timeUnits[rnd.nextInt(timeUnits.length)];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a timeout for use when any value at all will do.
|
||||
*/
|
||||
static long randomTimeout() { return RANDOM_TIMEOUT; }
|
||||
|
||||
/**
|
||||
* Returns a timeout that means "no waiting", i.e. not positive.
|
||||
*/
|
||||
static long randomExpiredTimeout() { return RANDOM_EXPIRED_TIMEOUT; }
|
||||
|
||||
/**
|
||||
* Returns a random non-null TimeUnit.
|
||||
*/
|
||||
static TimeUnit randomTimeUnit() { return RANDOM_TIMEUNIT; }
|
||||
|
||||
/**
|
||||
* Returns the shortest timed delay. This can be scaled up for
|
||||
* slow machines using the jsr166.delay.factor system property,
|
||||
@ -685,12 +718,17 @@ public class JSR166TestCase extends TestCase {
|
||||
LONG_DELAY_MS = SHORT_DELAY_MS * 200;
|
||||
}
|
||||
|
||||
private static final long TIMEOUT_DELAY_MS
|
||||
= (long) (12.0 * Math.cbrt(delayFactor));
|
||||
|
||||
/**
|
||||
* Returns a timeout in milliseconds to be used in tests that
|
||||
* verify that operations block or time out.
|
||||
* Returns a timeout in milliseconds to be used in tests that verify
|
||||
* that operations block or time out. We want this to be longer
|
||||
* than the OS scheduling quantum, but not too long, so don't scale
|
||||
* linearly with delayFactor; we use "crazy" cube root instead.
|
||||
*/
|
||||
long timeoutMillis() {
|
||||
return SHORT_DELAY_MS / 4;
|
||||
static long timeoutMillis() {
|
||||
return TIMEOUT_DELAY_MS;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1084,9 +1122,30 @@ public class JSR166TestCase extends TestCase {
|
||||
if (sm != null) System.setSecurityManager(sm);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that thread eventually enters the expected blocked thread state.
|
||||
*/
|
||||
void assertThreadBlocks(Thread thread, Thread.State expected) {
|
||||
// always sleep at least 1 ms, with high probability avoiding
|
||||
// transitory states
|
||||
for (long retries = LONG_DELAY_MS * 3 / 4; retries-->0; ) {
|
||||
try { delay(1); }
|
||||
catch (InterruptedException fail) {
|
||||
fail("Unexpected InterruptedException");
|
||||
}
|
||||
Thread.State s = thread.getState();
|
||||
if (s == expected)
|
||||
return;
|
||||
else if (s == Thread.State.TERMINATED)
|
||||
fail("Unexpected thread termination");
|
||||
}
|
||||
fail("timed out waiting for thread to enter thread state " + expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that thread does not terminate within the default
|
||||
* millisecond delay of {@code timeoutMillis()}.
|
||||
* TODO: REMOVEME
|
||||
*/
|
||||
void assertThreadStaysAlive(Thread thread) {
|
||||
assertThreadStaysAlive(thread, timeoutMillis());
|
||||
@ -1094,6 +1153,7 @@ public class JSR166TestCase extends TestCase {
|
||||
|
||||
/**
|
||||
* Checks that thread does not terminate within the given millisecond delay.
|
||||
* TODO: REMOVEME
|
||||
*/
|
||||
void assertThreadStaysAlive(Thread thread, long millis) {
|
||||
try {
|
||||
@ -1108,6 +1168,7 @@ public class JSR166TestCase extends TestCase {
|
||||
/**
|
||||
* Checks that the threads do not terminate within the default
|
||||
* millisecond delay of {@code timeoutMillis()}.
|
||||
* TODO: REMOVEME
|
||||
*/
|
||||
void assertThreadsStayAlive(Thread... threads) {
|
||||
assertThreadsStayAlive(timeoutMillis(), threads);
|
||||
@ -1115,6 +1176,7 @@ public class JSR166TestCase extends TestCase {
|
||||
|
||||
/**
|
||||
* Checks that the threads do not terminate within the given millisecond delay.
|
||||
* TODO: REMOVEME
|
||||
*/
|
||||
void assertThreadsStayAlive(long millis, Thread... threads) {
|
||||
try {
|
||||
@ -1164,6 +1226,12 @@ public class JSR166TestCase extends TestCase {
|
||||
fail("Should throw " + exceptionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum number of consecutive spurious wakeups we should
|
||||
* tolerate (from APIs like LockSupport.park) before failing a test.
|
||||
*/
|
||||
static final int MAX_SPURIOUS_WAKEUPS = 10;
|
||||
|
||||
/**
|
||||
* The number of elements to place in collections, arrays, etc.
|
||||
*/
|
||||
@ -1633,6 +1701,14 @@ public class JSR166TestCase extends TestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void await(CyclicBarrier barrier) {
|
||||
try {
|
||||
barrier.await(LONG_DELAY_MS, MILLISECONDS);
|
||||
} catch (Throwable fail) {
|
||||
threadUnexpectedException(fail);
|
||||
}
|
||||
}
|
||||
|
||||
// /**
|
||||
// * Spin-waits up to LONG_DELAY_MS until flag becomes true.
|
||||
// */
|
||||
@ -1656,28 +1732,6 @@ public class JSR166TestCase extends TestCase {
|
||||
public String call() { throw new NullPointerException(); }
|
||||
}
|
||||
|
||||
public static class CallableOne implements Callable<Integer> {
|
||||
public Integer call() { return one; }
|
||||
}
|
||||
|
||||
public class ShortRunnable extends CheckedRunnable {
|
||||
protected void realRun() throws Throwable {
|
||||
delay(SHORT_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
public class ShortInterruptedRunnable extends CheckedInterruptedRunnable {
|
||||
protected void realRun() throws InterruptedException {
|
||||
delay(SHORT_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
public class SmallRunnable extends CheckedRunnable {
|
||||
protected void realRun() throws Throwable {
|
||||
delay(SMALL_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
public class SmallPossiblyInterruptedRunnable extends CheckedRunnable {
|
||||
protected void realRun() {
|
||||
try {
|
||||
@ -1686,25 +1740,6 @@ public class JSR166TestCase extends TestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public class SmallCallable extends CheckedCallable {
|
||||
protected Object realCall() throws InterruptedException {
|
||||
delay(SMALL_DELAY_MS);
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
}
|
||||
|
||||
public class MediumRunnable extends CheckedRunnable {
|
||||
protected void realRun() throws Throwable {
|
||||
delay(MEDIUM_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
public class MediumInterruptedRunnable extends CheckedInterruptedRunnable {
|
||||
protected void realRun() throws InterruptedException {
|
||||
delay(MEDIUM_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
public Runnable possiblyInterruptedRunnable(final long timeoutMillis) {
|
||||
return new CheckedRunnable() {
|
||||
protected void realRun() {
|
||||
@ -1714,22 +1749,6 @@ public class JSR166TestCase extends TestCase {
|
||||
}};
|
||||
}
|
||||
|
||||
public class MediumPossiblyInterruptedRunnable extends CheckedRunnable {
|
||||
protected void realRun() {
|
||||
try {
|
||||
delay(MEDIUM_DELAY_MS);
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
public class LongPossiblyInterruptedRunnable extends CheckedRunnable {
|
||||
protected void realRun() {
|
||||
try {
|
||||
delay(LONG_DELAY_MS);
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For use as ThreadFactory in constructors
|
||||
*/
|
||||
@ -1743,59 +1762,6 @@ public class JSR166TestCase extends TestCase {
|
||||
boolean isDone();
|
||||
}
|
||||
|
||||
public static TrackedRunnable trackedRunnable(final long timeoutMillis) {
|
||||
return new TrackedRunnable() {
|
||||
private volatile boolean done = false;
|
||||
public boolean isDone() { return done; }
|
||||
public void run() {
|
||||
try {
|
||||
delay(timeoutMillis);
|
||||
done = true;
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static class TrackedShortRunnable implements Runnable {
|
||||
public volatile boolean done = false;
|
||||
public void run() {
|
||||
try {
|
||||
delay(SHORT_DELAY_MS);
|
||||
done = true;
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TrackedSmallRunnable implements Runnable {
|
||||
public volatile boolean done = false;
|
||||
public void run() {
|
||||
try {
|
||||
delay(SMALL_DELAY_MS);
|
||||
done = true;
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TrackedMediumRunnable implements Runnable {
|
||||
public volatile boolean done = false;
|
||||
public void run() {
|
||||
try {
|
||||
delay(MEDIUM_DELAY_MS);
|
||||
done = true;
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TrackedLongRunnable implements Runnable {
|
||||
public volatile boolean done = false;
|
||||
public void run() {
|
||||
try {
|
||||
delay(LONG_DELAY_MS);
|
||||
done = true;
|
||||
} catch (InterruptedException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TrackedNoOpRunnable implements Runnable {
|
||||
public volatile boolean done = false;
|
||||
public void run() {
|
||||
@ -1803,17 +1769,6 @@ public class JSR166TestCase extends TestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public static class TrackedCallable implements Callable {
|
||||
public volatile boolean done = false;
|
||||
public Object call() {
|
||||
try {
|
||||
delay(SMALL_DELAY_MS);
|
||||
done = true;
|
||||
} catch (InterruptedException ok) {}
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Analog of CheckedRunnable for RecursiveAction
|
||||
*/
|
||||
@ -1880,7 +1835,7 @@ public class JSR166TestCase extends TestCase {
|
||||
assertEquals(0, q.size());
|
||||
assertNull(q.peek());
|
||||
assertNull(q.poll());
|
||||
assertNull(q.poll(0, MILLISECONDS));
|
||||
assertNull(q.poll(randomExpiredTimeout(), randomTimeUnit()));
|
||||
assertEquals(q.toString(), "[]");
|
||||
assertTrue(Arrays.equals(q.toArray(), new Object[0]));
|
||||
assertFalse(q.iterator().hasNext());
|
||||
@ -2031,4 +1986,176 @@ public class JSR166TestCase extends TestCase {
|
||||
static <T> void shuffle(T[] array) {
|
||||
Collections.shuffle(Arrays.asList(array), ThreadLocalRandom.current());
|
||||
}
|
||||
|
||||
// --- Shared assertions for Executor tests ---
|
||||
|
||||
/**
|
||||
* Returns maximum number of tasks that can be submitted to given
|
||||
* pool (with bounded queue) before saturation (when submission
|
||||
* throws RejectedExecutionException).
|
||||
*/
|
||||
static final int saturatedSize(ThreadPoolExecutor pool) {
|
||||
BlockingQueue<Runnable> q = pool.getQueue();
|
||||
return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
|
||||
}
|
||||
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
void assertNullTaskSubmissionThrowsNullPointerException(Executor e) {
|
||||
try {
|
||||
e.execute((Runnable) null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
|
||||
if (! (e instanceof ExecutorService)) return;
|
||||
ExecutorService es = (ExecutorService) e;
|
||||
try {
|
||||
es.submit((Runnable) null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
try {
|
||||
es.submit((Runnable) null, Boolean.TRUE);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
try {
|
||||
es.submit((Callable) null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
|
||||
if (! (e instanceof ScheduledExecutorService)) return;
|
||||
ScheduledExecutorService ses = (ScheduledExecutorService) e;
|
||||
try {
|
||||
ses.schedule((Runnable) null,
|
||||
randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
try {
|
||||
ses.schedule((Callable) null,
|
||||
randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
try {
|
||||
ses.scheduleAtFixedRate((Runnable) null,
|
||||
randomTimeout(), LONG_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
try {
|
||||
ses.scheduleWithFixedDelay((Runnable) null,
|
||||
randomTimeout(), LONG_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
|
||||
void setRejectedExecutionHandler(
|
||||
ThreadPoolExecutor p, RejectedExecutionHandler handler) {
|
||||
p.setRejectedExecutionHandler(handler);
|
||||
assertSame(handler, p.getRejectedExecutionHandler());
|
||||
}
|
||||
|
||||
void assertTaskSubmissionsAreRejected(ThreadPoolExecutor p) {
|
||||
final RejectedExecutionHandler savedHandler = p.getRejectedExecutionHandler();
|
||||
final long savedTaskCount = p.getTaskCount();
|
||||
final long savedCompletedTaskCount = p.getCompletedTaskCount();
|
||||
final int savedQueueSize = p.getQueue().size();
|
||||
final boolean stock = (p.getClass().getClassLoader() == null);
|
||||
|
||||
Runnable r = () -> {};
|
||||
Callable<Boolean> c = () -> Boolean.TRUE;
|
||||
|
||||
class Recorder implements RejectedExecutionHandler {
|
||||
public volatile Runnable r = null;
|
||||
public volatile ThreadPoolExecutor p = null;
|
||||
public void reset() { r = null; p = null; }
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
|
||||
assertNull(this.r);
|
||||
assertNull(this.p);
|
||||
this.r = r;
|
||||
this.p = p;
|
||||
}
|
||||
}
|
||||
|
||||
// check custom handler is invoked exactly once per task
|
||||
Recorder recorder = new Recorder();
|
||||
setRejectedExecutionHandler(p, recorder);
|
||||
for (int i = 2; i--> 0; ) {
|
||||
recorder.reset();
|
||||
p.execute(r);
|
||||
if (stock && p.getClass() == ThreadPoolExecutor.class)
|
||||
assertSame(r, recorder.r);
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
recorder.reset();
|
||||
assertFalse(p.submit(r).isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
recorder.reset();
|
||||
assertFalse(p.submit(r, Boolean.TRUE).isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
recorder.reset();
|
||||
assertFalse(p.submit(c).isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
if (p instanceof ScheduledExecutorService) {
|
||||
ScheduledExecutorService s = (ScheduledExecutorService) p;
|
||||
ScheduledFuture<?> future;
|
||||
|
||||
recorder.reset();
|
||||
future = s.schedule(r, randomTimeout(), randomTimeUnit());
|
||||
assertFalse(future.isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
recorder.reset();
|
||||
future = s.schedule(c, randomTimeout(), randomTimeUnit());
|
||||
assertFalse(future.isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
recorder.reset();
|
||||
future = s.scheduleAtFixedRate(r, randomTimeout(), LONG_DELAY_MS, MILLISECONDS);
|
||||
assertFalse(future.isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
|
||||
recorder.reset();
|
||||
future = s.scheduleWithFixedDelay(r, randomTimeout(), LONG_DELAY_MS, MILLISECONDS);
|
||||
assertFalse(future.isDone());
|
||||
if (stock) assertTrue(!((FutureTask) recorder.r).isDone());
|
||||
assertSame(p, recorder.p);
|
||||
}
|
||||
}
|
||||
|
||||
// Checking our custom handler above should be sufficient, but
|
||||
// we add some integration tests of standard handlers.
|
||||
final AtomicReference<Thread> thread = new AtomicReference<>();
|
||||
final Runnable setThread = () -> thread.set(Thread.currentThread());
|
||||
|
||||
setRejectedExecutionHandler(p, new ThreadPoolExecutor.AbortPolicy());
|
||||
try {
|
||||
p.execute(setThread);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {}
|
||||
assertNull(thread.get());
|
||||
|
||||
setRejectedExecutionHandler(p, new ThreadPoolExecutor.DiscardPolicy());
|
||||
p.execute(setThread);
|
||||
assertNull(thread.get());
|
||||
|
||||
setRejectedExecutionHandler(p, new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
p.execute(setThread);
|
||||
if (p.isShutdown())
|
||||
assertNull(thread.get());
|
||||
else
|
||||
assertSame(Thread.currentThread(), thread.get());
|
||||
|
||||
setRejectedExecutionHandler(p, savedHandler);
|
||||
|
||||
// check that pool was not perturbed by handlers
|
||||
assertEquals(savedTaskCount, p.getTaskCount());
|
||||
assertEquals(savedCompletedTaskCount, p.getCompletedTaskCount());
|
||||
assertEquals(savedQueueSize, p.getQueue().size());
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
@ -52,12 +54,14 @@ import java.util.concurrent.RunnableScheduledFuture;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import junit.framework.Test;
|
||||
import junit.framework.TestSuite;
|
||||
@ -303,110 +307,67 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* execute(null) throws NPE
|
||||
* Submitting null tasks throws NullPointerException
|
||||
*/
|
||||
public void testExecuteNull() throws InterruptedException {
|
||||
public void testNullTaskSubmission() {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.execute(null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
assertNullTaskSubmissionThrowsNullPointerException(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* schedule(null) throws NPE
|
||||
* Submitted tasks are rejected when shutdown
|
||||
*/
|
||||
public void testScheduleNull() throws InterruptedException {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
TrackedCallable callable = null;
|
||||
Future f = p.schedule(callable, SHORT_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException {
|
||||
final CustomExecutor p = new CustomExecutor(2);
|
||||
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||
final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize());
|
||||
final CountDownLatch done = new CountDownLatch(1);
|
||||
final Runnable r = () -> {
|
||||
threadsStarted.countDown();
|
||||
for (;;) {
|
||||
try {
|
||||
done.await();
|
||||
return;
|
||||
} catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
|
||||
}};
|
||||
final Callable<Boolean> c = () -> {
|
||||
threadsStarted.countDown();
|
||||
for (;;) {
|
||||
try {
|
||||
done.await();
|
||||
return Boolean.TRUE;
|
||||
} catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
|
||||
}};
|
||||
|
||||
/**
|
||||
* execute throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testSchedule1_RejectedExecutionException() {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
try (PoolCleaner cleaner = cleaner(p, done)) {
|
||||
for (int i = p.getCorePoolSize(); i--> 0; ) {
|
||||
switch (rnd.nextInt(4)) {
|
||||
case 0: p.execute(r); break;
|
||||
case 1: assertFalse(p.submit(r).isDone()); break;
|
||||
case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
|
||||
case 3: assertFalse(p.submit(c).isDone()); break;
|
||||
}
|
||||
}
|
||||
|
||||
// ScheduledThreadPoolExecutor has an unbounded queue, so never saturated.
|
||||
await(threadsStarted);
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.shutdownNow();
|
||||
else
|
||||
p.shutdown();
|
||||
p.schedule(new NoOpRunnable(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
// Pool is shutdown, but not yet terminated
|
||||
assertTaskSubmissionsAreRejected(p);
|
||||
assertFalse(p.isTerminated());
|
||||
|
||||
/**
|
||||
* schedule throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testSchedule2_RejectedExecutionException() {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.schedule(new NoOpCallable(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
done.countDown(); // release blocking tasks
|
||||
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
|
||||
|
||||
/**
|
||||
* schedule callable throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testSchedule3_RejectedExecutionException() {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.schedule(new NoOpCallable(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* scheduleAtFixedRate throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testScheduleAtFixedRate1_RejectedExecutionException() {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.scheduleAtFixedRate(new NoOpRunnable(),
|
||||
MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* scheduleWithFixedDelay throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testScheduleWithFixedDelay1_RejectedExecutionException() {
|
||||
final CustomExecutor p = new CustomExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.scheduleWithFixedDelay(new NoOpRunnable(),
|
||||
MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
assertTaskSubmissionsAreRejected(p);
|
||||
}
|
||||
assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -445,13 +406,13 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
public void realRun() throws InterruptedException {
|
||||
threadStarted.countDown();
|
||||
assertEquals(0, p.getCompletedTaskCount());
|
||||
threadProceed.await();
|
||||
await(threadProceed);
|
||||
threadDone.countDown();
|
||||
}});
|
||||
await(threadStarted);
|
||||
assertEquals(0, p.getCompletedTaskCount());
|
||||
threadProceed.countDown();
|
||||
threadDone.await();
|
||||
await(threadDone);
|
||||
long startTime = System.nanoTime();
|
||||
while (p.getCompletedTaskCount() != 1) {
|
||||
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
|
||||
@ -812,91 +773,187 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
* - setExecuteExistingDelayedTasksAfterShutdownPolicy
|
||||
* - setContinueExistingPeriodicTasksAfterShutdownPolicy
|
||||
*/
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
public void testShutdown_cancellation() throws Exception {
|
||||
Boolean[] allBooleans = { null, Boolean.FALSE, Boolean.TRUE };
|
||||
for (Boolean policy : allBooleans)
|
||||
{
|
||||
final int poolSize = 2;
|
||||
final int poolSize = 4;
|
||||
final CustomExecutor p = new CustomExecutor(poolSize);
|
||||
final boolean effectiveDelayedPolicy = (policy != Boolean.FALSE);
|
||||
final boolean effectivePeriodicPolicy = (policy == Boolean.TRUE);
|
||||
final boolean effectiveRemovePolicy = (policy == Boolean.TRUE);
|
||||
if (policy != null) {
|
||||
p.setExecuteExistingDelayedTasksAfterShutdownPolicy(policy);
|
||||
p.setContinueExistingPeriodicTasksAfterShutdownPolicy(policy);
|
||||
p.setRemoveOnCancelPolicy(policy);
|
||||
}
|
||||
final BlockingQueue<Runnable> q = p.getQueue();
|
||||
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||
final long delay = rnd.nextInt(2);
|
||||
final int rounds = rnd.nextInt(1, 3);
|
||||
final boolean effectiveDelayedPolicy;
|
||||
final boolean effectivePeriodicPolicy;
|
||||
final boolean effectiveRemovePolicy;
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
|
||||
effectiveDelayedPolicy = rnd.nextBoolean());
|
||||
else
|
||||
effectiveDelayedPolicy = true;
|
||||
assertEquals(effectiveDelayedPolicy,
|
||||
p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
|
||||
effectivePeriodicPolicy = rnd.nextBoolean());
|
||||
else
|
||||
effectivePeriodicPolicy = false;
|
||||
assertEquals(effectivePeriodicPolicy,
|
||||
p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.setRemoveOnCancelPolicy(
|
||||
effectiveRemovePolicy = rnd.nextBoolean());
|
||||
else
|
||||
effectiveRemovePolicy = false;
|
||||
assertEquals(effectiveRemovePolicy,
|
||||
p.getRemoveOnCancelPolicy());
|
||||
// Strategy: Wedge the pool with poolSize "blocker" threads
|
||||
|
||||
final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
|
||||
|
||||
// Strategy: Wedge the pool with one wave of "blocker" tasks,
|
||||
// then add a second wave that waits in the queue until unblocked.
|
||||
final AtomicInteger ran = new AtomicInteger(0);
|
||||
final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
|
||||
final CountDownLatch unblock = new CountDownLatch(1);
|
||||
final CountDownLatch periodicLatch1 = new CountDownLatch(2);
|
||||
final CountDownLatch periodicLatch2 = new CountDownLatch(2);
|
||||
Runnable task = new CheckedRunnable() { public void realRun()
|
||||
throws InterruptedException {
|
||||
poolBlocked.countDown();
|
||||
assertTrue(unblock.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
ran.getAndIncrement();
|
||||
}};
|
||||
List<Future<?>> blockers = new ArrayList<>();
|
||||
List<Future<?>> periodics = new ArrayList<>();
|
||||
List<Future<?>> delayeds = new ArrayList<>();
|
||||
for (int i = 0; i < poolSize; i++)
|
||||
blockers.add(p.submit(task));
|
||||
assertTrue(poolBlocked.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
final RuntimeException exception = new RuntimeException();
|
||||
|
||||
periodics.add(p.scheduleAtFixedRate(countDowner(periodicLatch1),
|
||||
1, 1, MILLISECONDS));
|
||||
periodics.add(p.scheduleWithFixedDelay(countDowner(periodicLatch2),
|
||||
1, 1, MILLISECONDS));
|
||||
delayeds.add(p.schedule(task, 1, MILLISECONDS));
|
||||
class Task implements Runnable {
|
||||
public void run() {
|
||||
try {
|
||||
ran.getAndIncrement();
|
||||
poolBlocked.countDown();
|
||||
await(unblock);
|
||||
} catch (Throwable fail) { threadUnexpectedException(fail); }
|
||||
}
|
||||
}
|
||||
|
||||
class PeriodicTask extends Task {
|
||||
PeriodicTask(int rounds) { this.rounds = rounds; }
|
||||
int rounds;
|
||||
public void run() {
|
||||
if (--rounds == 0) super.run();
|
||||
// throw exception to surely terminate this periodic task,
|
||||
// but in a separate execution and in a detectable way.
|
||||
if (rounds == -1) throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
Runnable task = new Task();
|
||||
|
||||
List<Future<?>> immediates = new ArrayList<>();
|
||||
List<Future<?>> delayeds = new ArrayList<>();
|
||||
List<Future<?>> periodics = new ArrayList<>();
|
||||
|
||||
immediates.add(p.submit(task));
|
||||
delayeds.add(p.schedule(task, delay, MILLISECONDS));
|
||||
periodics.add(p.scheduleAtFixedRate(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
periodics.add(p.scheduleWithFixedDelay(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
|
||||
await(poolBlocked);
|
||||
|
||||
assertEquals(poolSize, ran.get());
|
||||
assertEquals(poolSize, p.getActiveCount());
|
||||
assertTrue(q.isEmpty());
|
||||
|
||||
// Add second wave of tasks.
|
||||
immediates.add(p.submit(task));
|
||||
delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS));
|
||||
periodics.add(p.scheduleAtFixedRate(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
periodics.add(p.scheduleWithFixedDelay(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
|
||||
assertEquals(poolSize, q.size());
|
||||
assertEquals(poolSize, ran.get());
|
||||
|
||||
immediates.forEach(
|
||||
f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
|
||||
|
||||
Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
|
||||
.forEach(f -> assertFalse(f.isDone()));
|
||||
|
||||
assertTrue(p.getQueue().containsAll(periodics));
|
||||
assertTrue(p.getQueue().containsAll(delayeds));
|
||||
try { p.shutdown(); } catch (SecurityException ok) { return; }
|
||||
assertTrue(p.isShutdown());
|
||||
assertTrue(p.isTerminating());
|
||||
assertFalse(p.isTerminated());
|
||||
for (Future<?> periodic : periodics) {
|
||||
assertTrue(effectivePeriodicPolicy ^ periodic.isCancelled());
|
||||
assertTrue(effectivePeriodicPolicy ^ periodic.isDone());
|
||||
}
|
||||
for (Future<?> delayed : delayeds) {
|
||||
assertTrue(effectiveDelayedPolicy ^ delayed.isCancelled());
|
||||
assertTrue(effectiveDelayedPolicy ^ delayed.isDone());
|
||||
}
|
||||
if (testImplementationDetails) {
|
||||
assertEquals(effectivePeriodicPolicy,
|
||||
p.getQueue().containsAll(periodics));
|
||||
assertEquals(effectiveDelayedPolicy,
|
||||
p.getQueue().containsAll(delayeds));
|
||||
}
|
||||
// Release all pool threads
|
||||
unblock.countDown();
|
||||
|
||||
for (Future<?> delayed : delayeds) {
|
||||
if (effectiveDelayedPolicy) {
|
||||
assertNull(delayed.get());
|
||||
}
|
||||
}
|
||||
if (effectivePeriodicPolicy) {
|
||||
assertTrue(periodicLatch1.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
assertTrue(periodicLatch2.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
for (Future<?> periodic : periodics) {
|
||||
assertTrue(periodic.cancel(false));
|
||||
assertTrue(periodic.isCancelled());
|
||||
assertTrue(periodic.isDone());
|
||||
}
|
||||
if (rnd.nextBoolean())
|
||||
assertThrows(
|
||||
RejectedExecutionException.class,
|
||||
() -> p.submit(task),
|
||||
() -> p.schedule(task, 1, SECONDS),
|
||||
() -> p.scheduleAtFixedRate(
|
||||
new PeriodicTask(1), 1, 1, SECONDS),
|
||||
() -> p.scheduleWithFixedDelay(
|
||||
new PeriodicTask(2), 1, 1, SECONDS));
|
||||
|
||||
assertTrue(q.contains(immediates.get(1)));
|
||||
assertTrue(!effectiveDelayedPolicy
|
||||
^ q.contains(delayeds.get(1)));
|
||||
assertTrue(!effectivePeriodicPolicy
|
||||
^ q.containsAll(periodics.subList(2, 4)));
|
||||
|
||||
immediates.forEach(f -> assertFalse(f.isDone()));
|
||||
|
||||
assertFalse(delayeds.get(0).isDone());
|
||||
if (effectiveDelayedPolicy)
|
||||
assertFalse(delayeds.get(1).isDone());
|
||||
else
|
||||
assertTrue(delayeds.get(1).isCancelled());
|
||||
|
||||
if (effectivePeriodicPolicy)
|
||||
periodics.forEach(
|
||||
f -> {
|
||||
assertFalse(f.isDone());
|
||||
if (!periodicTasksContinue) {
|
||||
assertTrue(f.cancel(false));
|
||||
assertTrue(f.isCancelled());
|
||||
}
|
||||
});
|
||||
else {
|
||||
periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
|
||||
periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));
|
||||
}
|
||||
|
||||
unblock.countDown(); // Release all pool threads
|
||||
|
||||
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
|
||||
assertFalse(p.isTerminating());
|
||||
assertTrue(p.isTerminated());
|
||||
assertEquals(2 + (effectiveDelayedPolicy ? 1 : 0), ran.get());
|
||||
}}
|
||||
|
||||
assertTrue(q.isEmpty());
|
||||
|
||||
Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
|
||||
.forEach(f -> assertTrue(f.isDone()));
|
||||
|
||||
for (Future<?> f : immediates) assertNull(f.get());
|
||||
|
||||
assertNull(delayeds.get(0).get());
|
||||
if (effectiveDelayedPolicy)
|
||||
assertNull(delayeds.get(1).get());
|
||||
else
|
||||
assertTrue(delayeds.get(1).isCancelled());
|
||||
|
||||
if (periodicTasksContinue)
|
||||
periodics.forEach(
|
||||
f -> {
|
||||
try { f.get(); }
|
||||
catch (ExecutionException success) {
|
||||
assertSame(exception, success.getCause());
|
||||
}
|
||||
catch (Throwable fail) { threadUnexpectedException(fail); }
|
||||
});
|
||||
else
|
||||
periodics.forEach(f -> assertTrue(f.isCancelled()));
|
||||
|
||||
assertEquals(poolSize + 1
|
||||
+ (effectiveDelayedPolicy ? 1 : 0)
|
||||
+ (periodicTasksContinue ? 2 : 0),
|
||||
ran.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* completed submit of callable returns result
|
||||
@ -948,7 +1005,7 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAny(empty collection) throws IAE
|
||||
* invokeAny(empty collection) throws IllegalArgumentException
|
||||
*/
|
||||
public void testInvokeAny2() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
@ -1023,12 +1080,14 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAll(empty collection) returns empty collection
|
||||
* invokeAll(empty collection) returns empty list
|
||||
*/
|
||||
public void testInvokeAll2() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
final Collection<Callable<String>> emptyCollection
|
||||
= Collections.emptyList();
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
|
||||
List<Future<String>> r = e.invokeAll(emptyCollection);
|
||||
assertTrue(r.isEmpty());
|
||||
}
|
||||
}
|
||||
@ -1091,7 +1150,7 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
try {
|
||||
e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAny(null, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
@ -1106,20 +1165,22 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
List<Callable<String>> l = new ArrayList<>();
|
||||
l.add(new StringTask());
|
||||
try {
|
||||
e.invokeAny(l, MEDIUM_DELAY_MS, null);
|
||||
e.invokeAny(l, randomTimeout(), null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAny(empty collection) throws IAE
|
||||
* timed invokeAny(empty collection) throws IllegalArgumentException
|
||||
*/
|
||||
public void testTimedInvokeAny2() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
final Collection<Callable<String>> emptyCollection
|
||||
= Collections.emptyList();
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
try {
|
||||
e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (IllegalArgumentException success) {}
|
||||
}
|
||||
@ -1136,7 +1197,7 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
l.add(latchAwaitingStringTask(latch));
|
||||
l.add(null);
|
||||
try {
|
||||
e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAny(l, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
latch.countDown();
|
||||
@ -1179,20 +1240,20 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAll(null) throws NPE
|
||||
* timed invokeAll(null) throws NullPointerException
|
||||
*/
|
||||
public void testTimedInvokeAll1() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
try {
|
||||
e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAll(null, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAll(,,null) throws NPE
|
||||
* timed invokeAll(,,null) throws NullPointerException
|
||||
*/
|
||||
public void testTimedInvokeAllNullTimeUnit() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
@ -1200,19 +1261,22 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
List<Callable<String>> l = new ArrayList<>();
|
||||
l.add(new StringTask());
|
||||
try {
|
||||
e.invokeAll(l, MEDIUM_DELAY_MS, null);
|
||||
e.invokeAll(l, randomTimeout(), null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAll(empty collection) returns empty collection
|
||||
* timed invokeAll(empty collection) returns empty list
|
||||
*/
|
||||
public void testTimedInvokeAll2() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
final Collection<Callable<String>> emptyCollection
|
||||
= Collections.emptyList();
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
List<Future<String>> r =
|
||||
e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
|
||||
assertTrue(r.isEmpty());
|
||||
}
|
||||
}
|
||||
@ -1227,7 +1291,7 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
l.add(new StringTask());
|
||||
l.add(null);
|
||||
try {
|
||||
e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAll(l, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
@ -1238,11 +1302,11 @@ public class ScheduledExecutorSubclassTest extends JSR166TestCase {
|
||||
*/
|
||||
public void testTimedInvokeAll4() throws Exception {
|
||||
final ExecutorService e = new CustomExecutor(2);
|
||||
final Collection<Callable<String>> c = new ArrayList<>();
|
||||
c.add(new NPETask());
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
List<Callable<String>> l = new ArrayList<>();
|
||||
l.add(new NPETask());
|
||||
List<Future<String>> futures =
|
||||
e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAll(c, LONG_DELAY_MS, MILLISECONDS);
|
||||
assertEquals(1, futures.size());
|
||||
try {
|
||||
futures.get(0).get();
|
||||
|
@ -38,6 +38,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
@ -51,10 +53,12 @@ import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import junit.framework.Test;
|
||||
import junit.framework.TestSuite;
|
||||
@ -77,7 +81,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
final Runnable task = new CheckedRunnable() {
|
||||
public void realRun() { done.countDown(); }};
|
||||
p.execute(task);
|
||||
assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
await(done);
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,7 +102,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
|
||||
assertSame(Boolean.TRUE, f.get());
|
||||
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
|
||||
assertTrue(done.await(0L, MILLISECONDS));
|
||||
assertEquals(0L, done.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,110 +251,67 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* execute(null) throws NPE
|
||||
* Submitting null tasks throws NullPointerException
|
||||
*/
|
||||
public void testExecuteNull() throws InterruptedException {
|
||||
public void testNullTaskSubmission() {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.execute(null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
assertNullTaskSubmissionThrowsNullPointerException(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* schedule(null) throws NPE
|
||||
* Submitted tasks are rejected when shutdown
|
||||
*/
|
||||
public void testScheduleNull() throws InterruptedException {
|
||||
public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
TrackedCallable callable = null;
|
||||
Future f = p.schedule(callable, SHORT_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||
final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize());
|
||||
final CountDownLatch done = new CountDownLatch(1);
|
||||
final Runnable r = () -> {
|
||||
threadsStarted.countDown();
|
||||
for (;;) {
|
||||
try {
|
||||
done.await();
|
||||
return;
|
||||
} catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
|
||||
}};
|
||||
final Callable<Boolean> c = () -> {
|
||||
threadsStarted.countDown();
|
||||
for (;;) {
|
||||
try {
|
||||
done.await();
|
||||
return Boolean.TRUE;
|
||||
} catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
|
||||
}};
|
||||
|
||||
/**
|
||||
* execute throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testSchedule1_RejectedExecutionException() throws InterruptedException {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
try (PoolCleaner cleaner = cleaner(p, done)) {
|
||||
for (int i = p.getCorePoolSize(); i--> 0; ) {
|
||||
switch (rnd.nextInt(4)) {
|
||||
case 0: p.execute(r); break;
|
||||
case 1: assertFalse(p.submit(r).isDone()); break;
|
||||
case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
|
||||
case 3: assertFalse(p.submit(c).isDone()); break;
|
||||
}
|
||||
}
|
||||
|
||||
// ScheduledThreadPoolExecutor has an unbounded queue, so never saturated.
|
||||
await(threadsStarted);
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.shutdownNow();
|
||||
else
|
||||
p.shutdown();
|
||||
p.schedule(new NoOpRunnable(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
// Pool is shutdown, but not yet terminated
|
||||
assertTaskSubmissionsAreRejected(p);
|
||||
assertFalse(p.isTerminated());
|
||||
|
||||
/**
|
||||
* schedule throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testSchedule2_RejectedExecutionException() throws InterruptedException {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.schedule(new NoOpCallable(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
done.countDown(); // release blocking tasks
|
||||
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
|
||||
|
||||
/**
|
||||
* schedule callable throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testSchedule3_RejectedExecutionException() throws InterruptedException {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.schedule(new NoOpCallable(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* scheduleAtFixedRate throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testScheduleAtFixedRate1_RejectedExecutionException() throws InterruptedException {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.scheduleAtFixedRate(new NoOpRunnable(),
|
||||
MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* scheduleWithFixedDelay throws RejectedExecutionException if shutdown
|
||||
*/
|
||||
public void testScheduleWithFixedDelay1_RejectedExecutionException() throws InterruptedException {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
try {
|
||||
p.shutdown();
|
||||
p.scheduleWithFixedDelay(new NoOpRunnable(),
|
||||
MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
shouldThrow();
|
||||
} catch (RejectedExecutionException success) {
|
||||
} catch (SecurityException ok) {}
|
||||
assertTaskSubmissionsAreRejected(p);
|
||||
}
|
||||
assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -389,13 +350,13 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
public void realRun() throws InterruptedException {
|
||||
threadStarted.countDown();
|
||||
assertEquals(0, p.getCompletedTaskCount());
|
||||
threadProceed.await();
|
||||
await(threadProceed);
|
||||
threadDone.countDown();
|
||||
}});
|
||||
await(threadStarted);
|
||||
assertEquals(0, p.getCompletedTaskCount());
|
||||
threadProceed.countDown();
|
||||
threadDone.await();
|
||||
await(threadDone);
|
||||
long startTime = System.nanoTime();
|
||||
while (p.getCompletedTaskCount() != 1) {
|
||||
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
|
||||
@ -533,6 +494,17 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The default rejected execution handler is AbortPolicy.
|
||||
*/
|
||||
public void testDefaultRejectedExecutionHandler() {
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
assertTrue(p.getRejectedExecutionHandler()
|
||||
instanceof ThreadPoolExecutor.AbortPolicy);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* isShutdown is false before shutdown, true after
|
||||
*/
|
||||
@ -759,92 +731,188 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
* - setExecuteExistingDelayedTasksAfterShutdownPolicy
|
||||
* - setContinueExistingPeriodicTasksAfterShutdownPolicy
|
||||
*/
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
public void testShutdown_cancellation() throws Exception {
|
||||
Boolean[] allBooleans = { null, Boolean.FALSE, Boolean.TRUE };
|
||||
for (Boolean policy : allBooleans)
|
||||
{
|
||||
final int poolSize = 2;
|
||||
final int poolSize = 4;
|
||||
final ScheduledThreadPoolExecutor p
|
||||
= new ScheduledThreadPoolExecutor(poolSize);
|
||||
final boolean effectiveDelayedPolicy = (policy != Boolean.FALSE);
|
||||
final boolean effectivePeriodicPolicy = (policy == Boolean.TRUE);
|
||||
final boolean effectiveRemovePolicy = (policy == Boolean.TRUE);
|
||||
if (policy != null) {
|
||||
p.setExecuteExistingDelayedTasksAfterShutdownPolicy(policy);
|
||||
p.setContinueExistingPeriodicTasksAfterShutdownPolicy(policy);
|
||||
p.setRemoveOnCancelPolicy(policy);
|
||||
}
|
||||
final BlockingQueue<Runnable> q = p.getQueue();
|
||||
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||
final long delay = rnd.nextInt(2);
|
||||
final int rounds = rnd.nextInt(1, 3);
|
||||
final boolean effectiveDelayedPolicy;
|
||||
final boolean effectivePeriodicPolicy;
|
||||
final boolean effectiveRemovePolicy;
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
|
||||
effectiveDelayedPolicy = rnd.nextBoolean());
|
||||
else
|
||||
effectiveDelayedPolicy = true;
|
||||
assertEquals(effectiveDelayedPolicy,
|
||||
p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
|
||||
effectivePeriodicPolicy = rnd.nextBoolean());
|
||||
else
|
||||
effectivePeriodicPolicy = false;
|
||||
assertEquals(effectivePeriodicPolicy,
|
||||
p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
|
||||
|
||||
if (rnd.nextBoolean())
|
||||
p.setRemoveOnCancelPolicy(
|
||||
effectiveRemovePolicy = rnd.nextBoolean());
|
||||
else
|
||||
effectiveRemovePolicy = false;
|
||||
assertEquals(effectiveRemovePolicy,
|
||||
p.getRemoveOnCancelPolicy());
|
||||
// Strategy: Wedge the pool with poolSize "blocker" threads
|
||||
|
||||
final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
|
||||
|
||||
// Strategy: Wedge the pool with one wave of "blocker" tasks,
|
||||
// then add a second wave that waits in the queue until unblocked.
|
||||
final AtomicInteger ran = new AtomicInteger(0);
|
||||
final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
|
||||
final CountDownLatch unblock = new CountDownLatch(1);
|
||||
final CountDownLatch periodicLatch1 = new CountDownLatch(2);
|
||||
final CountDownLatch periodicLatch2 = new CountDownLatch(2);
|
||||
Runnable task = new CheckedRunnable() { public void realRun()
|
||||
throws InterruptedException {
|
||||
poolBlocked.countDown();
|
||||
assertTrue(unblock.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
ran.getAndIncrement();
|
||||
}};
|
||||
List<Future<?>> blockers = new ArrayList<>();
|
||||
List<Future<?>> periodics = new ArrayList<>();
|
||||
List<Future<?>> delayeds = new ArrayList<>();
|
||||
for (int i = 0; i < poolSize; i++)
|
||||
blockers.add(p.submit(task));
|
||||
assertTrue(poolBlocked.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
final RuntimeException exception = new RuntimeException();
|
||||
|
||||
periodics.add(p.scheduleAtFixedRate(countDowner(periodicLatch1),
|
||||
1, 1, MILLISECONDS));
|
||||
periodics.add(p.scheduleWithFixedDelay(countDowner(periodicLatch2),
|
||||
1, 1, MILLISECONDS));
|
||||
delayeds.add(p.schedule(task, 1, MILLISECONDS));
|
||||
class Task implements Runnable {
|
||||
public void run() {
|
||||
try {
|
||||
ran.getAndIncrement();
|
||||
poolBlocked.countDown();
|
||||
await(unblock);
|
||||
} catch (Throwable fail) { threadUnexpectedException(fail); }
|
||||
}
|
||||
}
|
||||
|
||||
class PeriodicTask extends Task {
|
||||
PeriodicTask(int rounds) { this.rounds = rounds; }
|
||||
int rounds;
|
||||
public void run() {
|
||||
if (--rounds == 0) super.run();
|
||||
// throw exception to surely terminate this periodic task,
|
||||
// but in a separate execution and in a detectable way.
|
||||
if (rounds == -1) throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
Runnable task = new Task();
|
||||
|
||||
List<Future<?>> immediates = new ArrayList<>();
|
||||
List<Future<?>> delayeds = new ArrayList<>();
|
||||
List<Future<?>> periodics = new ArrayList<>();
|
||||
|
||||
immediates.add(p.submit(task));
|
||||
delayeds.add(p.schedule(task, delay, MILLISECONDS));
|
||||
periodics.add(p.scheduleAtFixedRate(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
periodics.add(p.scheduleWithFixedDelay(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
|
||||
await(poolBlocked);
|
||||
|
||||
assertEquals(poolSize, ran.get());
|
||||
assertEquals(poolSize, p.getActiveCount());
|
||||
assertTrue(q.isEmpty());
|
||||
|
||||
// Add second wave of tasks.
|
||||
immediates.add(p.submit(task));
|
||||
delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS));
|
||||
periodics.add(p.scheduleAtFixedRate(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
periodics.add(p.scheduleWithFixedDelay(
|
||||
new PeriodicTask(rounds), delay, 1, MILLISECONDS));
|
||||
|
||||
assertEquals(poolSize, q.size());
|
||||
assertEquals(poolSize, ran.get());
|
||||
|
||||
immediates.forEach(
|
||||
f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
|
||||
|
||||
Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
|
||||
.forEach(f -> assertFalse(f.isDone()));
|
||||
|
||||
assertTrue(p.getQueue().containsAll(periodics));
|
||||
assertTrue(p.getQueue().containsAll(delayeds));
|
||||
try { p.shutdown(); } catch (SecurityException ok) { return; }
|
||||
assertTrue(p.isShutdown());
|
||||
assertTrue(p.isTerminating());
|
||||
assertFalse(p.isTerminated());
|
||||
for (Future<?> periodic : periodics) {
|
||||
assertTrue(effectivePeriodicPolicy ^ periodic.isCancelled());
|
||||
assertTrue(effectivePeriodicPolicy ^ periodic.isDone());
|
||||
}
|
||||
for (Future<?> delayed : delayeds) {
|
||||
assertTrue(effectiveDelayedPolicy ^ delayed.isCancelled());
|
||||
assertTrue(effectiveDelayedPolicy ^ delayed.isDone());
|
||||
}
|
||||
if (testImplementationDetails) {
|
||||
assertEquals(effectivePeriodicPolicy,
|
||||
p.getQueue().containsAll(periodics));
|
||||
assertEquals(effectiveDelayedPolicy,
|
||||
p.getQueue().containsAll(delayeds));
|
||||
}
|
||||
// Release all pool threads
|
||||
unblock.countDown();
|
||||
|
||||
for (Future<?> delayed : delayeds) {
|
||||
if (effectiveDelayedPolicy) {
|
||||
assertNull(delayed.get());
|
||||
}
|
||||
}
|
||||
if (effectivePeriodicPolicy) {
|
||||
assertTrue(periodicLatch1.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
assertTrue(periodicLatch2.await(LONG_DELAY_MS, MILLISECONDS));
|
||||
for (Future<?> periodic : periodics) {
|
||||
assertTrue(periodic.cancel(false));
|
||||
assertTrue(periodic.isCancelled());
|
||||
assertTrue(periodic.isDone());
|
||||
}
|
||||
if (rnd.nextBoolean())
|
||||
assertThrows(
|
||||
RejectedExecutionException.class,
|
||||
() -> p.submit(task),
|
||||
() -> p.schedule(task, 1, SECONDS),
|
||||
() -> p.scheduleAtFixedRate(
|
||||
new PeriodicTask(1), 1, 1, SECONDS),
|
||||
() -> p.scheduleWithFixedDelay(
|
||||
new PeriodicTask(2), 1, 1, SECONDS));
|
||||
|
||||
assertTrue(q.contains(immediates.get(1)));
|
||||
assertTrue(!effectiveDelayedPolicy
|
||||
^ q.contains(delayeds.get(1)));
|
||||
assertTrue(!effectivePeriodicPolicy
|
||||
^ q.containsAll(periodics.subList(2, 4)));
|
||||
|
||||
immediates.forEach(f -> assertFalse(f.isDone()));
|
||||
|
||||
assertFalse(delayeds.get(0).isDone());
|
||||
if (effectiveDelayedPolicy)
|
||||
assertFalse(delayeds.get(1).isDone());
|
||||
else
|
||||
assertTrue(delayeds.get(1).isCancelled());
|
||||
|
||||
if (effectivePeriodicPolicy)
|
||||
periodics.forEach(
|
||||
f -> {
|
||||
assertFalse(f.isDone());
|
||||
if (!periodicTasksContinue) {
|
||||
assertTrue(f.cancel(false));
|
||||
assertTrue(f.isCancelled());
|
||||
}
|
||||
});
|
||||
else {
|
||||
periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
|
||||
periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));
|
||||
}
|
||||
|
||||
unblock.countDown(); // Release all pool threads
|
||||
|
||||
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
|
||||
assertFalse(p.isTerminating());
|
||||
assertTrue(p.isTerminated());
|
||||
assertEquals(2 + (effectiveDelayedPolicy ? 1 : 0), ran.get());
|
||||
}}
|
||||
|
||||
assertTrue(q.isEmpty());
|
||||
|
||||
Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
|
||||
.forEach(f -> assertTrue(f.isDone()));
|
||||
|
||||
for (Future<?> f : immediates) assertNull(f.get());
|
||||
|
||||
assertNull(delayeds.get(0).get());
|
||||
if (effectiveDelayedPolicy)
|
||||
assertNull(delayeds.get(1).get());
|
||||
else
|
||||
assertTrue(delayeds.get(1).isCancelled());
|
||||
|
||||
if (periodicTasksContinue)
|
||||
periodics.forEach(
|
||||
f -> {
|
||||
try { f.get(); }
|
||||
catch (ExecutionException success) {
|
||||
assertSame(exception, success.getCause());
|
||||
}
|
||||
catch (Throwable fail) { threadUnexpectedException(fail); }
|
||||
});
|
||||
else
|
||||
periodics.forEach(f -> assertTrue(f.isCancelled()));
|
||||
|
||||
assertEquals(poolSize + 1
|
||||
+ (effectiveDelayedPolicy ? 1 : 0)
|
||||
+ (periodicTasksContinue ? 2 : 0),
|
||||
ran.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* completed submit of callable returns result
|
||||
@ -883,7 +951,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAny(null) throws NPE
|
||||
* invokeAny(null) throws NullPointerException
|
||||
*/
|
||||
public void testInvokeAny1() throws Exception {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
@ -896,7 +964,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAny(empty collection) throws IAE
|
||||
* invokeAny(empty collection) throws IllegalArgumentException
|
||||
*/
|
||||
public void testInvokeAny2() throws Exception {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
@ -909,7 +977,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAny(c) throws NPE if c has null elements
|
||||
* invokeAny(c) throws NullPointerException if c has null elements
|
||||
*/
|
||||
public void testInvokeAny3() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
@ -971,12 +1039,14 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* invokeAll(empty collection) returns empty collection
|
||||
* invokeAll(empty collection) returns empty list
|
||||
*/
|
||||
public void testInvokeAll2() throws Exception {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
final Collection<Callable<String>> emptyCollection
|
||||
= Collections.emptyList();
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
|
||||
List<Future<String>> r = e.invokeAll(emptyCollection);
|
||||
assertTrue(r.isEmpty());
|
||||
}
|
||||
}
|
||||
@ -1039,14 +1109,14 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
try {
|
||||
e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAny(null, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAny(,,null) throws NPE
|
||||
* timed invokeAny(,,null) throws NullPointerException
|
||||
*/
|
||||
public void testTimedInvokeAnyNullTimeUnit() throws Exception {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
@ -1054,20 +1124,22 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
List<Callable<String>> l = new ArrayList<>();
|
||||
l.add(new StringTask());
|
||||
try {
|
||||
e.invokeAny(l, MEDIUM_DELAY_MS, null);
|
||||
e.invokeAny(l, randomTimeout(), null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAny(empty collection) throws IAE
|
||||
* timed invokeAny(empty collection) throws IllegalArgumentException
|
||||
*/
|
||||
public void testTimedInvokeAny2() throws Exception {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
final Collection<Callable<String>> emptyCollection
|
||||
= Collections.emptyList();
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
try {
|
||||
e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (IllegalArgumentException success) {}
|
||||
}
|
||||
@ -1084,7 +1156,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
l.add(latchAwaitingStringTask(latch));
|
||||
l.add(null);
|
||||
try {
|
||||
e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAny(l, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
latch.countDown();
|
||||
@ -1133,7 +1205,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
try {
|
||||
e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAll(null, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
@ -1148,20 +1220,22 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
List<Callable<String>> l = new ArrayList<>();
|
||||
l.add(new StringTask());
|
||||
try {
|
||||
e.invokeAll(l, MEDIUM_DELAY_MS, null);
|
||||
e.invokeAll(l, randomTimeout(), null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* timed invokeAll(empty collection) returns empty collection
|
||||
* timed invokeAll(empty collection) returns empty list
|
||||
*/
|
||||
public void testTimedInvokeAll2() throws Exception {
|
||||
final ExecutorService e = new ScheduledThreadPoolExecutor(2);
|
||||
final Collection<Callable<String>> emptyCollection
|
||||
= Collections.emptyList();
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(),
|
||||
MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
List<Future<String>> r =
|
||||
e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
|
||||
assertTrue(r.isEmpty());
|
||||
}
|
||||
}
|
||||
@ -1176,7 +1250,7 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
l.add(new StringTask());
|
||||
l.add(null);
|
||||
try {
|
||||
e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
|
||||
e.invokeAll(l, randomTimeout(), randomTimeUnit());
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
@ -1262,18 +1336,16 @@ public class ScheduledExecutorTest extends JSR166TestCase {
|
||||
* one-shot task from executing.
|
||||
* https://bugs.openjdk.java.net/browse/JDK-8051859
|
||||
*/
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
public void testScheduleWithFixedDelay_overflow() throws Exception {
|
||||
final CountDownLatch delayedDone = new CountDownLatch(1);
|
||||
final CountDownLatch immediateDone = new CountDownLatch(1);
|
||||
final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
|
||||
try (PoolCleaner cleaner = cleaner(p)) {
|
||||
final Runnable immediate = new Runnable() { public void run() {
|
||||
immediateDone.countDown();
|
||||
}};
|
||||
final Runnable delayed = new Runnable() { public void run() {
|
||||
final Runnable delayed = () -> {
|
||||
delayedDone.countDown();
|
||||
p.submit(immediate);
|
||||
}};
|
||||
p.submit(() -> immediateDone.countDown());
|
||||
};
|
||||
p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS);
|
||||
await(delayedDone);
|
||||
await(immediateDone);
|
||||
|
Loading…
x
Reference in New Issue
Block a user