Vert.x 源码阅读 (1) - Future 和 Promise

Vert.x 源码阅读 (2) - Stream

Vert.x 源码阅读 (3) - EventBus

Vert.x 源码阅读 (4) - Context

这是 Vert.x 项目源码阅读笔记的第一篇, 主要讲一下最基础的 FuturePromise 的概念。FuturePromise 实际上是异步编程中非常基础的两个抽象,大部分程序员想必都非常熟悉,我这里主要是整理为主。

Vert.x 中的 FuturePromise

AsyncResult 接口

Vert.x 中的 Future 扩展了 AsyncResultAsyncResult 有点像一个 Monad,它包装了一个结果,或者是一个错误。注意 AsyncResult 这个接口并不包含结果还没有准备就绪的状态,这个状态是由 Future 暴露出来的。

Future 接口

Future 逻辑上表达了一个异步计算的结果。它扩展了 AsyncResult 接口,主要提供了两类方法:

  1. 判断是否已经完成:isComplete
  2. 注册回调方法,例如 onComplete

等待结果的一方直接面对的,应该是一个 Future 对象。这个对象对于结果使用方来说,是只读的。使用方可以不断的 poll 它直到计算完成,结果 ready。同时使用也可用基于这个 Future 创建一个新的 Future 对象。

Promise 接口

Promise 有点难解释,我觉得我们可以把它理解成 Future 的可写的另一端。实际完成异步计算的结果提供方,在干完活之后,将结果写入这个 PromisePromise 会负责通知自己对应的 Future 结果已经准备就绪了。

Golang 中的 Channel

我感觉 Golang 中的 channel 是一个有点相似的概念。FuturePromisechannel 的两端。这也意味着,我们可以用 FuturePromise 来做线程间同步,虽然我们不应该怎么做,而应该根据你的情况使用更加适合的,专门用于同步的抽象。

一般的使用场景

在一般情况下,PromiseFuture 的使用往往符合这样的模式。我们假设存在 A 和 B 两个实体,A 需要 B 执行一个任务,然后将结果告诉自己。为了达到这个目的,一般会经过一下几个步骤:

  1. A 触发整个流程,往往是通过调用 B 提供的某个接口,这个接口一般都返回一个 Future 对象。
  2. B 的接口中创建了一对 PromiseFutureFuture 会被返回给 A。
  3. A 获得返回的 Future 之后一般会在这个 Future 上注册回调函数,表示收到结果之后要做什么。
  4. B 在结果计算出来之后调用 Promise::complete 方法,而这个 complete 方法会调用 A 在 Future 上注册的函数。

样例

为了加深理解,这里给出几段代码,用来品品这两个抽象的使用方式。为了方便理解,这里去掉了和 context 相关的代码。

Futuremap 方法

1
2
3
4
5
6
7
8
9
10
11
public Future<O> map(Function<I, O> transformer) {
Promise<O> ret = Promise.promise();
this.setHandler(asyncResult -> {
if (asyncResult.succeed()) {
ret.complete(transformer.apply(asyncResult.result()));
} else {
ret.fail(asyncResult.cause())
}
});
return ret.future();
}

通过 map 的代码,我们可以清楚地发现,PromiseFuture 非常类似于一个 channel。我们总是留住一根管子的一端,把另一端交给下游。自己干完活就对着管子吼一声。

CompositeFuture 的实现

这里为了方便理解做了一点简化,实际上 CompositeFuture 是一个接口,真正的实现在 CompositeFutureImpl 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CompositeFuture implements Future<CompositeFuture> {
// 自己包含的子 Future 的数量
private int count;
// 用来表示自己所有子 Future 完成的 Promise
private Promise promise = Promise.promise();

public static CompositFuture all(List<Future<?>> futures) {
CompositFutureImpl composit = new CompositFutureImpl(futures);

for (Future<?> future : futures) {
future.setHandler(result -> {
if (result.succeed()) {
composit.count ++;
}
if (count == composit.len) {
composit.succeed();
}
})
}
}
}

CompositFuture 的实现思路也很简单,就是在每一个子 Future 上注册一个回调来通知自己一个子 Future 已经完成。如果所有的子 Future 都完成了,就可以把自己标记成完成。