DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Using CompletableFuture

原文链接

This post revisits Java 8’s CompletionStage API and specifically its implementation in the standard Java library CompletableFuture.

It represents a stage of a certain computation which can be done either synchronously or asynchronously.

1
2
3
4
5
6
7
8
9
static void runAsyncExample() {
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
});
assertFalse(cf.isDone());
sleepEnough();
assertTrue(cf.isDone());
}

关于sleepEnough?

By default (when no Executor is specified), asynchronous execution uses the common ForkJoinPool implementation, which uses daemon threads to execute the Runnable task. Note that this is specific to CompletableFuture. Other CompletionStage implementations can override the default behavior.

use executors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
int count = 1;
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "custom-executor-" + count++);
}
});
static void thenApplyAsyncWithExecutorExample() {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
assertFalse(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
}, executor);
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
}

thenApply return value

thenAccept do not need to return a value

1
2
3
4
5
6
7
8
9
static void applyToEitherExample() {
String original = "Message";
CompletableFuture<String> cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture<String> cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
assertTrue(cf2.join().endsWith(" from applyToEither"));
}
1
2
3
4
5
6
7
8
static void thenAcceptBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}
1
2
3
4
5
6
7
8
static void thenCombineAsyncExample() {
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// sync
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});

// async
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();