8134851: Integrate CompletableFuture with API enhancements

8039378: CompletableFuture: Avoid StackOverflowError for long linear chains

Reviewed-by: martin, psandoz, chegar
This commit is contained in:
Doug Lea 2015-10-13 16:04:56 -07:00
parent 2beb7a2fd8
commit cf43156eed
5 changed files with 649 additions and 193 deletions

View File

@ -34,12 +34,11 @@
*/
package java.util.concurrent;
import java.util.function.Supplier;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* A stage of a possibly asynchronous computation, that performs an
@ -56,9 +55,9 @@ import java.util.concurrent.Executor;
* For example, {@code stage.thenApply(x -> square(x)).thenAccept(x ->
* System.out.print(x)).thenRun(() -> System.out.println())}. An
* additional form (<em>compose</em>) applies functions of stages
* themselves, rather than their results. </li>
* themselves, rather than their results.
*
* <li> One stage's execution may be triggered by completion of a
* <li>One stage's execution may be triggered by completion of a
* single stage, or both of two stages, or either of two stages.
* Dependencies on a single stage are arranged using methods with
* prefix <em>then</em>. Those triggered by completion of
@ -66,9 +65,9 @@ import java.util.concurrent.Executor;
* effects, using correspondingly named methods. Those triggered by
* <em>either</em> of two stages make no guarantees about which of the
* results or effects are used for the dependent stage's
* computation.</li>
* computation.
*
* <li> Dependencies among stages control the triggering of
* <li>Dependencies among stages control the triggering of
* computations, but do not otherwise guarantee any particular
* ordering. Additionally, execution of a new stage's computations may
* be arranged in any of three ways: default execution, default
@ -81,7 +80,7 @@ import java.util.concurrent.Executor;
* properties, and might not even support concurrent execution, but
* are arranged for processing in a way that accommodates asynchrony.
*
* <li> Two method forms support processing whether the triggering
* <li>Two method forms support processing whether the triggering
* stage completed normally or exceptionally: Method {@link
* #whenComplete whenComplete} allows injection of an action
* regardless of outcome, otherwise preserving the outcome in its
@ -100,7 +99,7 @@ import java.util.concurrent.Executor;
* stage completes normally or exceptionally. In the case of method
* {@code whenComplete}, when the supplied action itself encounters an
* exception, then the stage exceptionally completes with this
* exception if not already completed exceptionally.</li>
* exception if not already completed exceptionally.
*
* </ul>
*
@ -587,7 +586,7 @@ public interface CompletionStage<T> {
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage as the argument
* normally, is executed with this stage's result as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
@ -603,7 +602,7 @@ public interface CompletionStage<T> {
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage as the argument to the
* execution facility, with this stage's result as the argument to the
* supplied function.
*
* See the {@link CompletionStage} documentation for rules
@ -652,12 +651,14 @@ public interface CompletionStage<T> {
* Returns a new CompletionStage with the same result or exception as
* this stage, that executes the given action when this stage completes.
*
* <p>When this stage is complete, the given action is invoked with the
* result (or {@code null} if none) and the exception (or {@code null}
* if none) of this stage as arguments. The returned stage is completed
* when the action returns. If the supplied action itself encounters an
* exception, then the returned stage exceptionally completes with this
* exception unless this stage also completed exceptionally.
* <p>When this stage is complete, the given action is invoked
* with the result (or {@code null} if none) and the exception (or
* {@code null} if none) of this stage as arguments. The returned
* stage is completed when the action returns. If the supplied
* action itself encounters an exception, then the returned stage
* exceptionally completes with this exception unless this stage
* also completed exceptionally (in which case, the returned stage
* exceptionally completes with the original exception).
*
* @param action the action to perform
* @return the new CompletionStage

View File

@ -53,7 +53,6 @@ import java.util.concurrent.Executors;
import static java.util.concurrent.ForkJoinPool.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Basic {
static void checkCompletedNormally(CompletableFuture<?> cf, Object value) {
@ -66,6 +65,7 @@ public class Basic {
try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); }
try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); }
check(cf.isDone(), "Expected isDone to be true, got:" + cf);
check(!cf.isCompletedExceptionally(), "Expected isCompletedExceptionally to return false");
check(!cf.isCancelled(), "Expected isCancelled to be false");
check(!cf.cancel(true), "Expected cancel to return false");
check(cf.toString().contains("[Completed normally]"));
@ -97,6 +97,7 @@ public class Basic {
catch (CancellationException x) { if (cancelled) pass(); else fail(); }
catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); }
check(cf.isDone(), "Expected isDone to be true, got:" + cf);
check(cf.isCompletedExceptionally(), "Expected isCompletedExceptionally");
check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:" + cf.isCancelled());
check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:" + cf.cancel(true));
check(cf.toString().contains("[Completed exceptionally]")); // ## TODO: 'E'xceptionally
@ -805,6 +806,49 @@ public class Basic {
cf2 = cf1.handle((x,t) -> { check(t.getCause() == ex); return 2;});
checkCompletedExceptionally(cf1);
checkCompletedNormally(cf2, 2);
cf1 = supplyAsync(() -> 1);
cf2 = cf1.handleAsync((x,t) -> x+1);
checkCompletedNormally(cf1, 1);
checkCompletedNormally(cf2, 2);
cf1 = supplyAsync(() -> { throw ex; });
cf2 = cf1.handleAsync((x,t) -> { check(t.getCause() == ex); return 2;});
checkCompletedExceptionally(cf1);
checkCompletedNormally(cf2, 2);
} catch (Throwable t) { unexpected(t); }
//----------------------------------------------------------------
// whenComplete tests
//----------------------------------------------------------------
try {
AtomicInteger count = new AtomicInteger();
CompletableFuture<Integer> cf2;
CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement());
checkCompletedNormally(cf1, 1);
checkCompletedNormally(cf2, 1);
check(count.get() == 1, "action count should be incremented");
final RuntimeException ex = new RuntimeException();
cf1 = supplyAsync(() -> { throw ex; });
cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement());
checkCompletedExceptionally(cf1);
checkCompletedExceptionally(cf2);
check(count.get() == 2, "action count should be incremented");
cf1 = supplyAsync(() -> 1);
cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement());
checkCompletedNormally(cf1, 1);
checkCompletedNormally(cf2, 1);
check(count.get() == 3, "action count should be incremented");
cf1 = supplyAsync(() -> { throw ex; });
cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement());
checkCompletedExceptionally(cf1);
checkCompletedExceptionally(cf2);
check(count.get() == 4, "action count should be incremented");
} catch (Throwable t) { unexpected(t); }
}

View File

@ -27,7 +27,6 @@ import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
/**
* @test
* @bug 8029164

View File

@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
* @test
* @bug 8068432 8072030