Vert.x 源码阅读 (1) - Future 和 Promise

Vert.x 源码阅读 (2) - Stream

Vert.x 源码阅读 (3) - EventBus

Vert.x 源码阅读 (4) - Context

这是 Vert.x 项目源码阅读笔记的第三篇,主要记录一下 EventBus 相关的代码。EventBus 是一个轻量级的分布式消息系统,让 Vert.x 的服务中各个组件之间可以以一种低耦合的方式交互。代码在这个目录下,主要包含了 EventBusMessageConsumerMessageProducer,以及 DeliveryContext

消息总线 EventBus

EventBus 提供了两类接口:

  1. 通讯相关的接口,例如 send, publish 等等。
  2. 创建更加高级的抽象的接口,例如 consumerproducer

EventBus 只提供的 best-effort 的投递保证,它提供了三种通讯模式:

模式名称 特征 方法
Pub-Sub 异步单向,一对多 publish
P2P 不期待响应,一对一 send
Request-Response (以下简称 RR) 期待响应,一对一 request

其中 P2P 和 RR 模式非常类似,区别只在于 RR 会指定一个 replyHandler(实现上会在投递的消息中指定一个 replyAddress,之后详细介绍)。

Message

Message 是实际发送的消息,它实际上是对网络请求的封装。它包含了消息的发送地址,接收地址等等,这里不详细介绍了。

MessageConsumer

MessageConsumer (逻辑上很自然地)扩展了 ReadStream<Message<T>>接口,是对消息处理做的抽象。它是线程安全的,但是如果能只在单一线程(也就是之后会讲到的事件循环线程)上被使用的话,性能会更好一点。这是因为它的并发控制使用了 synchronize 关键字,而 JVM 在没有竞争的情况下会退化到使用偏向锁,减少同步开销。

实现

MessageConsumer 本身是一个接口,它的实现是 MessageConsumerImpl。我们这里先看一下它的核心方法之一,doReceive(Message<T> message)。它是新消息进入时的入口方法,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void doReceive(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (demand == 0L) {
// 如果没有需求
// 回忆一下之前提到的 Stream 的背压处理
if (pending.size() < MaxBuffer) {
pending.add(message);
} else {
// 超过最大缓冲大小的话就丢弃
discardHandler.handle(message);
}
} else {
if (pending.size() > 0) {
// 从之前 pending 中最早的消息开始处理
pending.add(message);
message = pending.poll();
}
// handler 是 mututable 的
// copy 一下 handler 的引用,在同步块外实际调用 handler
// 很典型的并发编程模式
theHandler = handler;
}
}

// 将消息发送给 handler.
deliver(theHandler, message);
}

从这个方法我们看出来 MessageConsumer 的运作原理。

这里还有一个比较有意思的地方是 deliver 方法:

1
2
3
4
5
6
7
8
9
10
11
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
// 给 Producer 回血
String creditsAddress = message.headers().get(CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}

// 实际将消息发送给 handler.
dispatch(theHandler, message, context.duplicate());
checkNextTick();
}

这个方法的前四行代码发送了 1 这条消息给 creditsAddress,实际上是给 MessageProducer 监听的一个”队列”发送 creditMessageProducer 收到这个 credit 就会给自己”回一滴血”,表示自己能够多一个可以发消息的额度。

MessageProducer

MessageProducer (在逻辑上很自然地)扩展了 WriteStream<T> 接口,是对消息发送做的抽象。

实现

MessageProducerImplMessageProducer 的实现。它有两个核心方法,分别是 writedoReceiveCredit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public void write(T data, Handler<AsyncResult<Void>> handler) {
Promise<Void> promise = createPromise();
promise.future().setHandler(handler);
write(data, promise);
}

private void write(T data, Promise<Void> handler) {
MessageImpl msg = createMessage();
OutboundDeliveryContext<T> sendCtx = createContext();

// 只有在 P2P 和 RR 模式下,send 才是 true。
// 这也很合理。在 Pub-Sub 模式下,Producer 不会管下游"死活",下游甚至都可能没有任何接受者。
// 而在 P2P 和 RR 模式下,我们要考虑下游,实现背压。
if (send) {
synchronized (this) {
// credits 有点类似令牌桶
if (credits > 0) {
credits--;
} else {
// 没有额度就先 pending。
// 回忆一下 WriteStream 和 ReadStream 的流量控制机制。
// WriteStream::writeQueueFull 的实现就是判断 credits == 0
pending.add(sendCtx);
return;
}
}
}
bus.sendOrPubInternal(msg, options, null, handler);
}

//============================================

private synchronized void doReceiveCredit(int credit) {
// 回复发送额度
credits += credit;
while (credits > 0) {
// 看看有没有 pending 的内容可以发的。
}
// 看看是不是从超负荷恢复过来了
// 是的话,要调用 drainHandler
checkDrained();
}

最开始我比较困惑的一点是,通过网络来恢复额度,总感觉不是很稳,要是网络挂了,就没法回血来允许接着发消息。但是仔细想想,要是网络挂了,发消息也没有意义了。

DeliveryContext

DeliveryContext 封装了一个要发送的消息,同时提供了一些控制方法。它最核心的方法就是 next()。一个 DeliveryContext 会包含多个 interceptor,而 next 会遍历这些 interceptor,一个个调用,直到全部调用完之后,再实际发送。这个类似于责任链模式,在很多框架中都使用了,例如 Spring Boot,Netty。