目录
Vert.x 源码阅读 (3) - EventBus
这是 Vert.x 项目源码阅读笔记的第三篇,主要记录一下 EventBus
相关的代码。EventBus
是一个轻量级的分布式消息系统,让 Vert.x 的服务中各个组件之间可以以一种低耦合的方式交互。代码在这个目录下,主要包含了 EventBus
,MessageConsumer
,MessageProducer
,以及 DeliveryContext
。
消息总线 EventBus
EventBus
提供了两类接口:
- 通讯相关的接口,例如
send
,publish
等等。 - 创建更加高级的抽象的接口,例如
consumer
,producer
。
EventBus
只提供的 best-effort 的投递保证,它提供了三种通讯模式:
模式名称 | 特征 | 方法 |
---|---|---|
Pub-Sub | 异步单向,一对多 | publish |
P2P | 不期待响应,一对一 | send |
Request-Response (以下简称 RR) | 期待响应,一对一 | request |
其中 P2P 和 RR 模式非常类似,区别只在于 RR 会指定一个 replyHandler
(实现上会在投递的消息中指定一个 replyAddress
,之后详细介绍)。
Message
类
Message
是实际发送的消息,它实际上是对网络请求的封装。它包含了消息的发送地址,接收地址等等,这里不详细介绍了。
MessageConsumer
MessageConsumer
(逻辑上很自然地)扩展了 ReadStream<Message<T>>
接口,是对消息处理做的抽象。它是线程安全的,但是如果能只在单一线程(也就是之后会讲到的事件循环线程)上被使用的话,性能会更好一点。这是因为它的并发控制使用了 synchronize
关键字,而 JVM 在没有竞争的情况下会退化到使用偏向锁,减少同步开销。
实现
MessageConsumer
本身是一个接口,它的实现是 MessageConsumerImpl
。我们这里先看一下它的核心方法之一,doReceive(Message<T> message)
。它是新消息进入时的入口方法,核心代码如下:
1 | protected void doReceive(Message<T> message) { |
从这个方法我们看出来 MessageConsumer
的运作原理。
这里还有一个比较有意思的地方是 deliver
方法:
1 | private void deliver(Handler<Message<T>> theHandler, Message<T> message) { |
这个方法的前四行代码发送了 1
这条消息给 creditsAddress
,实际上是给 MessageProducer
监听的一个”队列”发送 credit
,MessageProducer
收到这个 credit
就会给自己”回一滴血”,表示自己能够多一个可以发消息的额度。
MessageProducer
MessageProducer
(在逻辑上很自然地)扩展了 WriteStream<T>
接口,是对消息发送做的抽象。
实现
MessageProducerImpl
是 MessageProducer
的实现。它有两个核心方法,分别是 write
和 doReceiveCredit
。
1 |
|
最开始我比较困惑的一点是,通过网络来恢复额度,总感觉不是很稳,要是网络挂了,就没法回血来允许接着发消息。但是仔细想想,要是网络挂了,发消息也没有意义了。
DeliveryContext
DeliveryContext
封装了一个要发送的消息,同时提供了一些控制方法。它最核心的方法就是 next()
。一个 DeliveryContext
会包含多个 interceptor,而 next
会遍历这些 interceptor,一个个调用,直到全部调用完之后,再实际发送。这个类似于责任链模式,在很多框架中都使用了,例如 Spring Boot,Netty。