DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

CompletionStage and CompltetableFuture

原文链接

CompletionStage pipelines

CompletableFuture implements the CompletionStage interface.

A convenient example of this is the thenApply method, which is analogous to map on the streaming API (or on the Optional class, for example).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// We'll allow a 2-sized thread pool for async execution, rather than using the fork/join pool for asynchronous
// value generation
final Executor executor = Executors.newFixedThreadPool(2);


@Test
public void demoThenApply() throws Exception {
final CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(delayedValueSupplier(1), executor)
.thenApply(i -> i + 3);
assertThat(future.get(), is(4));
}

private Supplier<Integer> delayedValueSupplier(final int value) {
return delayedValueSupplier(value, 1000);
}

private Supplier<Integer> delayedValueSupplier(final int value, final int delayMs) {
return () -> {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
throw new RuntimeException("Problem while waiting to return value");
}
return value;
};
}

pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void demoMultiStagePipeline() throws Exception {
// We'll use one future to specify how long a subsequent future will take
final CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(delayedValueSupplier(3), executor)
.thenApply(i -> {
System.out.println("First future completed, " + i);
return i + 1;
})
.thenCompose(i -> CompletableFuture
.supplyAsync(delayedValueSupplier(i + 2, i * 100), executor));
assertThat(future.get(), is(6));
}