8166465: CompletableFuture.minimalCompletionStage().toCompletableFuture() should be non-minimal

Reviewed-by: martin, chegar, shade
This commit is contained in:
Doug Lea 2016-09-23 13:14:14 -07:00
parent 72a44a46fa
commit 9496149e05
2 changed files with 215 additions and 30 deletions

View File

@ -2559,6 +2559,13 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* exceptionally with a CompletionException with this exception as * exceptionally with a CompletionException with this exception as
* cause. * cause.
* *
* <p>Unless overridden by a subclass, a new non-minimal
* CompletableFuture with all methods available can be obtained from
* a minimal CompletionStage via {@link #toCompletableFuture()}.
* For example, completion of a minimal stage can be awaited by
*
* <pre> {@code minimalStage.toCompletableFuture().join(); }</pre>
*
* @return the new CompletionStage * @return the new CompletionStage
* @since 9 * @since 9
*/ */
@ -2853,6 +2860,16 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
@Override public CompletableFuture<T> completeOnTimeout @Override public CompletableFuture<T> completeOnTimeout
(T value, long timeout, TimeUnit unit) { (T value, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException(); } throw new UnsupportedOperationException(); }
@Override public CompletableFuture<T> toCompletableFuture() {
Object r;
if ((r = result) != null)
return new CompletableFuture<T>(encodeRelay(r));
else {
CompletableFuture<T> d = new CompletableFuture<>();
unipush(new UniRelay<T,T>(d, this));
return d;
}
}
} }
// VarHandle mechanics // VarHandle mechanics

View File

@ -388,7 +388,7 @@ public class CompletableFutureTest extends JSR166TestCase {
checkCompletedNormally(f, "test"); checkCompletedNormally(f, "test");
} }
abstract class CheckedAction { abstract static class CheckedAction {
int invocationCount = 0; int invocationCount = 0;
final ExecutionMode m; final ExecutionMode m;
CheckedAction(ExecutionMode m) { this.m = m; } CheckedAction(ExecutionMode m) { this.m = m; }
@ -400,7 +400,7 @@ public class CompletableFutureTest extends JSR166TestCase {
void assertInvoked() { assertEquals(1, invocationCount); } void assertInvoked() { assertEquals(1, invocationCount); }
} }
abstract class CheckedIntegerAction extends CheckedAction { abstract static class CheckedIntegerAction extends CheckedAction {
Integer value; Integer value;
CheckedIntegerAction(ExecutionMode m) { super(m); } CheckedIntegerAction(ExecutionMode m) { super(m); }
void assertValue(Integer expected) { void assertValue(Integer expected) {
@ -409,7 +409,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class IntegerSupplier extends CheckedAction static class IntegerSupplier extends CheckedAction
implements Supplier<Integer> implements Supplier<Integer>
{ {
final Integer value; final Integer value;
@ -428,7 +428,7 @@ public class CompletableFutureTest extends JSR166TestCase {
return (x == null) ? null : x + 1; return (x == null) ? null : x + 1;
} }
class NoopConsumer extends CheckedIntegerAction static class NoopConsumer extends CheckedIntegerAction
implements Consumer<Integer> implements Consumer<Integer>
{ {
NoopConsumer(ExecutionMode m) { super(m); } NoopConsumer(ExecutionMode m) { super(m); }
@ -438,7 +438,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class IncFunction extends CheckedIntegerAction static class IncFunction extends CheckedIntegerAction
implements Function<Integer,Integer> implements Function<Integer,Integer>
{ {
IncFunction(ExecutionMode m) { super(m); } IncFunction(ExecutionMode m) { super(m); }
@ -456,7 +456,7 @@ public class CompletableFutureTest extends JSR166TestCase {
- ((y == null) ? 99 : y.intValue()); - ((y == null) ? 99 : y.intValue());
} }
class SubtractAction extends CheckedIntegerAction static class SubtractAction extends CheckedIntegerAction
implements BiConsumer<Integer, Integer> implements BiConsumer<Integer, Integer>
{ {
SubtractAction(ExecutionMode m) { super(m); } SubtractAction(ExecutionMode m) { super(m); }
@ -466,7 +466,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class SubtractFunction extends CheckedIntegerAction static class SubtractFunction extends CheckedIntegerAction
implements BiFunction<Integer, Integer, Integer> implements BiFunction<Integer, Integer, Integer>
{ {
SubtractFunction(ExecutionMode m) { super(m); } SubtractFunction(ExecutionMode m) { super(m); }
@ -476,14 +476,14 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class Noop extends CheckedAction implements Runnable { static class Noop extends CheckedAction implements Runnable {
Noop(ExecutionMode m) { super(m); } Noop(ExecutionMode m) { super(m); }
public void run() { public void run() {
invoked(); invoked();
} }
} }
class FailingSupplier extends CheckedAction static class FailingSupplier extends CheckedAction
implements Supplier<Integer> implements Supplier<Integer>
{ {
final CFException ex; final CFException ex;
@ -494,7 +494,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class FailingConsumer extends CheckedIntegerAction static class FailingConsumer extends CheckedIntegerAction
implements Consumer<Integer> implements Consumer<Integer>
{ {
final CFException ex; final CFException ex;
@ -506,7 +506,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class FailingBiConsumer extends CheckedIntegerAction static class FailingBiConsumer extends CheckedIntegerAction
implements BiConsumer<Integer, Integer> implements BiConsumer<Integer, Integer>
{ {
final CFException ex; final CFException ex;
@ -518,7 +518,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class FailingFunction extends CheckedIntegerAction static class FailingFunction extends CheckedIntegerAction
implements Function<Integer, Integer> implements Function<Integer, Integer>
{ {
final CFException ex; final CFException ex;
@ -530,7 +530,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class FailingBiFunction extends CheckedIntegerAction static class FailingBiFunction extends CheckedIntegerAction
implements BiFunction<Integer, Integer, Integer> implements BiFunction<Integer, Integer, Integer>
{ {
final CFException ex; final CFException ex;
@ -542,7 +542,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class FailingRunnable extends CheckedAction implements Runnable { static class FailingRunnable extends CheckedAction implements Runnable {
final CFException ex; final CFException ex;
FailingRunnable(ExecutionMode m) { super(m); ex = new CFException(); } FailingRunnable(ExecutionMode m) { super(m); ex = new CFException(); }
public void run() { public void run() {
@ -551,7 +551,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class CompletableFutureInc extends CheckedIntegerAction static class CompletableFutureInc extends CheckedIntegerAction
implements Function<Integer, CompletableFuture<Integer>> implements Function<Integer, CompletableFuture<Integer>>
{ {
CompletableFutureInc(ExecutionMode m) { super(m); } CompletableFutureInc(ExecutionMode m) { super(m); }
@ -564,7 +564,7 @@ public class CompletableFutureTest extends JSR166TestCase {
} }
} }
class FailingCompletableFutureFunction extends CheckedIntegerAction static class FailingCompletableFutureFunction extends CheckedIntegerAction
implements Function<Integer, CompletableFuture<Integer>> implements Function<Integer, CompletableFuture<Integer>>
{ {
final CFException ex; final CFException ex;
@ -3604,29 +3604,53 @@ public class CompletableFutureTest extends JSR166TestCase {
* copy returns a CompletableFuture that is completed normally, * copy returns a CompletableFuture that is completed normally,
* with the same value, when source is. * with the same value, when source is.
*/ */
public void testCopy() { public void testCopy_normalCompletion() {
for (boolean createIncomplete : new boolean[] { true, false })
for (Integer v1 : new Integer[] { 1, null })
{
CompletableFuture<Integer> f = new CompletableFuture<>(); CompletableFuture<Integer> f = new CompletableFuture<>();
if (!createIncomplete) assertTrue(f.complete(v1));
CompletableFuture<Integer> g = f.copy(); CompletableFuture<Integer> g = f.copy();
if (createIncomplete) {
checkIncomplete(f); checkIncomplete(f);
checkIncomplete(g); checkIncomplete(g);
f.complete(1); assertTrue(f.complete(v1));
checkCompletedNormally(f, 1);
checkCompletedNormally(g, 1);
} }
checkCompletedNormally(f, v1);
checkCompletedNormally(g, v1);
}}
/** /**
* copy returns a CompletableFuture that is completed exceptionally * copy returns a CompletableFuture that is completed exceptionally
* when source is. * when source is.
*/ */
public void testCopy2() { public void testCopy_exceptionalCompletion() {
for (boolean createIncomplete : new boolean[] { true, false })
{
CFException ex = new CFException();
CompletableFuture<Integer> f = new CompletableFuture<>(); CompletableFuture<Integer> f = new CompletableFuture<>();
if (!createIncomplete) f.completeExceptionally(ex);
CompletableFuture<Integer> g = f.copy(); CompletableFuture<Integer> g = f.copy();
if (createIncomplete) {
checkIncomplete(f); checkIncomplete(f);
checkIncomplete(g); checkIncomplete(g);
CFException ex = new CFException();
f.completeExceptionally(ex); f.completeExceptionally(ex);
}
checkCompletedExceptionally(f, ex); checkCompletedExceptionally(f, ex);
checkCompletedWithWrappedException(g, ex); checkCompletedWithWrappedException(g, ex);
}}
/**
* Completion of a copy does not complete its source.
*/
public void testCopy_oneWayPropagation() {
CompletableFuture<Integer> f = new CompletableFuture<>();
assertTrue(f.copy().complete(1));
assertTrue(f.copy().complete(null));
assertTrue(f.copy().cancel(true));
assertTrue(f.copy().cancel(false));
assertTrue(f.copy().completeExceptionally(new CFException()));
checkIncomplete(f);
} }
/** /**
@ -3991,7 +4015,10 @@ public class CompletableFutureTest extends JSR166TestCase {
.collect(Collectors.toList()); .collect(Collectors.toList());
List<CompletionStage<Integer>> stages = new ArrayList<>(); List<CompletionStage<Integer>> stages = new ArrayList<>();
stages.add(new CompletableFuture<Integer>().minimalCompletionStage()); CompletionStage<Integer> min =
new CompletableFuture<Integer>().minimalCompletionStage();
stages.add(min);
stages.add(min.thenApply(x -> x));
stages.add(CompletableFuture.completedStage(1)); stages.add(CompletableFuture.completedStage(1));
stages.add(CompletableFuture.failedStage(new CFException())); stages.add(CompletableFuture.failedStage(new CFException()));
@ -4027,6 +4054,131 @@ public class CompletableFutureTest extends JSR166TestCase {
throw new Error("Methods did not throw UOE: " + bugs); throw new Error("Methods did not throw UOE: " + bugs);
} }
/**
* minimalStage.toCompletableFuture() returns a CompletableFuture that
* is completed normally, with the same value, when source is.
*/
public void testMinimalCompletionStage_toCompletableFuture_normalCompletion() {
for (boolean createIncomplete : new boolean[] { true, false })
for (Integer v1 : new Integer[] { 1, null })
{
CompletableFuture<Integer> f = new CompletableFuture<>();
CompletionStage<Integer> minimal = f.minimalCompletionStage();
if (!createIncomplete) assertTrue(f.complete(v1));
CompletableFuture<Integer> g = minimal.toCompletableFuture();
if (createIncomplete) {
checkIncomplete(f);
checkIncomplete(g);
assertTrue(f.complete(v1));
}
checkCompletedNormally(f, v1);
checkCompletedNormally(g, v1);
}}
/**
* minimalStage.toCompletableFuture() returns a CompletableFuture that
* is completed exceptionally when source is.
*/
public void testMinimalCompletionStage_toCompletableFuture_exceptionalCompletion() {
for (boolean createIncomplete : new boolean[] { true, false })
{
CFException ex = new CFException();
CompletableFuture<Integer> f = new CompletableFuture<>();
CompletionStage<Integer> minimal = f.minimalCompletionStage();
if (!createIncomplete) f.completeExceptionally(ex);
CompletableFuture<Integer> g = minimal.toCompletableFuture();
if (createIncomplete) {
checkIncomplete(f);
checkIncomplete(g);
f.completeExceptionally(ex);
}
checkCompletedExceptionally(f, ex);
checkCompletedWithWrappedException(g, ex);
}}
/**
* minimalStage.toCompletableFuture() gives mutable CompletableFuture
*/
public void testMinimalCompletionStage_toCompletableFuture_mutable() {
for (Integer v1 : new Integer[] { 1, null })
{
CompletableFuture<Integer> f = new CompletableFuture<>();
CompletionStage minimal = f.minimalCompletionStage();
CompletableFuture<Integer> g = minimal.toCompletableFuture();
assertTrue(g.complete(v1));
checkCompletedNormally(g, v1);
checkIncomplete(f);
checkIncomplete(minimal.toCompletableFuture());
}}
/**
* minimalStage.toCompletableFuture().join() awaits completion
*/
public void testMinimalCompletionStage_toCompletableFuture_join() throws Exception {
for (boolean createIncomplete : new boolean[] { true, false })
for (Integer v1 : new Integer[] { 1, null })
{
CompletableFuture<Integer> f = new CompletableFuture<>();
if (!createIncomplete) assertTrue(f.complete(v1));
CompletionStage<Integer> minimal = f.minimalCompletionStage();
if (createIncomplete) assertTrue(f.complete(v1));
assertEquals(v1, minimal.toCompletableFuture().join());
assertEquals(v1, minimal.toCompletableFuture().get());
checkCompletedNormally(minimal.toCompletableFuture(), v1);
}}
/**
* Completion of a toCompletableFuture copy of a minimal stage
* does not complete its source.
*/
public void testMinimalCompletionStage_toCompletableFuture_oneWayPropagation() {
CompletableFuture<Integer> f = new CompletableFuture<>();
CompletionStage<Integer> g = f.minimalCompletionStage();
assertTrue(g.toCompletableFuture().complete(1));
assertTrue(g.toCompletableFuture().complete(null));
assertTrue(g.toCompletableFuture().cancel(true));
assertTrue(g.toCompletableFuture().cancel(false));
assertTrue(g.toCompletableFuture().completeExceptionally(new CFException()));
checkIncomplete(g.toCompletableFuture());
f.complete(1);
checkCompletedNormally(g.toCompletableFuture(), 1);
}
/** Demo utility method for external reliable toCompletableFuture */
static <T> CompletableFuture<T> toCompletableFuture(CompletionStage<T> stage) {
CompletableFuture<T> f = new CompletableFuture<>();
stage.handle((T t, Throwable ex) -> {
if (ex != null) f.completeExceptionally(ex);
else f.complete(t);
return null;
});
return f;
}
/** Demo utility method to join a CompletionStage */
static <T> T join(CompletionStage<T> stage) {
return toCompletableFuture(stage).join();
}
/**
* Joining a minimal stage "by hand" works
*/
public void testMinimalCompletionStage_join_by_hand() {
for (boolean createIncomplete : new boolean[] { true, false })
for (Integer v1 : new Integer[] { 1, null })
{
CompletableFuture<Integer> f = new CompletableFuture<>();
CompletionStage<Integer> minimal = f.minimalCompletionStage();
CompletableFuture<Integer> g = new CompletableFuture<>();
if (!createIncomplete) assertTrue(f.complete(v1));
minimal.thenAccept((x) -> g.complete(x));
if (createIncomplete) assertTrue(f.complete(v1));
g.join();
checkCompletedNormally(g, v1);
checkCompletedNormally(f, v1);
assertEquals(v1, join(minimal));
}}
static class Monad { static class Monad {
static class ZeroException extends RuntimeException { static class ZeroException extends RuntimeException {
public ZeroException() { super("monadic zero"); } public ZeroException() { super("monadic zero"); }
@ -4317,6 +4469,22 @@ public class CompletableFutureTest extends JSR166TestCase {
assertTrue(neverCompleted.thenRun(() -> {}).cancel(true)); assertTrue(neverCompleted.thenRun(() -> {}).cancel(true));
} }
/**
* Checks for garbage retention when MinimalStage.toCompletableFuture()
* is invoked many times.
* 8161600: Garbage retention when source CompletableFutures are never completed
*
* As of 2016-07, fails with OOME:
* ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testToCompletableFutureGarbageRetention tck
*/
public void testToCompletableFutureGarbageRetention() throws Throwable {
final int n = expensiveTests ? 900_000 : 10;
CompletableFuture<Integer> neverCompleted = new CompletableFuture<>();
CompletionStage minimal = neverCompleted.minimalCompletionStage();
for (int i = 0; i < n; i++)
assertTrue(minimal.toCompletableFuture().cancel(true));
}
// static <U> U join(CompletionStage<U> stage) { // static <U> U join(CompletionStage<U> stage) {
// CompletableFuture<U> f = new CompletableFuture<>(); // CompletableFuture<U> f = new CompletableFuture<>();
// stage.whenComplete((v, ex) -> { // stage.whenComplete((v, ex) -> {