Vert.x 源码阅读 (1) - Future 和 Promise

Vert.x 源码阅读 (2) - Stream

Vert.x 源码阅读 (3) - EventBus

Vert.x 源码阅读 (4) - Context

这是 Vert.x 项目源码阅读笔记的第四篇,主要记录一下 Vertx 中的核心 ContextContext 贯穿了整个 Vertx 的代码,它主要用来表示一个任务执行时的上下文环境。

一个服务实例往往需要同时处理大量的请求,而这些请求相互独立,拥有不同的上下文。在使用 Servlet 的年代,由于每个线程同时只处理一个请求,我们使用一个简单的 ThreadLocal 变量就可以满足需求。然而,在使用 EventLoop 的时候,由于一个线程会同时处理多个请求,我们需要显式地管理和切换上下文。Context 是 Vertx 中上下文的抽象。

简介

在详细介绍 Context 之前,我们先介绍几个名词:

  • Handler 是一个可执行的对象,类似 Runnable
  • 线程,就不多说了。
  • Execution 是指 Handler 的一次调用。Handler可以被多次调用。

Context

The execution context of a Handler execution.

简单来说,就是 Handler 的一次调用从开始到结束时,它使用的上下文信息。例如,一个 REST 请求执行过程中的 HEADER 信息。

在 Vertx 中,Context 和线程的关系简单来说,可以总结成一下几点

  1. 一个线程在不同的时间,会执行不同的 Handler,因此和它相关联的 context (通过 VertxThread::context) 获取会发生变化。即**一个线程会对应到多个 context**。
  2. 一个 Context 往往只对应到一个线程,但是并不强制。

Context 的继承关系如下图:

Context

根据上图,我们知道在 Vertx 主要有 EventLoopContextWorkerContext 这两类 Context。这两类 Context,前一类对应到 EventLoop 线程,后一类对应到 Worker 线程。它们两个的区别主要在于,在执行/调度一个任务时(调用一个 Handler 时),到底使用哪一个线程。

Context

Context 是一个接口,它主要包含了一下几类方法:

  1. 获取当前执行线程的一些基本信息,例如 isOnWorkerThread
  2. 获取自身属性,例如 isEventLoopContext
  3. 读取或更新上下文信息,例如 getput
  4. 执行任务,例如 runOnContextexecuteBlocking

前三类方法都相对比较好理解,我们主要看看 Context 提供的执行和调度任务的方法。

执行和调度任务

Context 主要提供了四种类型的调度方式:

方式名称 执行线程 上下文
execute 自己对应的线程 自己
schedule 自己对应的线程 线程执行任务时关联的 context
emit 调用 emit 方法的线程 自己
dispatch 自己对应的线程 自己

上表描述了各个类型的执行方法使用的线程和上下文,注意 dispatchexecute 的语义是一样的,它们的区别在于,

  • execute 将任务加入自己对应的线程的执行队列
  • dispatch 会判断当前线程是不是自己对应的线程,如果是的话直接执行。否则和 execute 一样。

上面的描述可能比较抽象,下面用伪代码来简单实现这几种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 通过保存和恢复现场,来实现在执行任务过程中,用自身作为上下文
public void emit(Runnable task) {
// 获取当前线程使用的 context,并将当前线程关联的 context 设置为自己
ContextInternal prev = emitBegin();
try {
handler.run();
} catch (Throwable t) {
reportException(t);
} finally {
// 恢复当前线程使用的 context
emitEnd(prev);
}
}

// 在自身关联的线程上,执行任务。这个任务被 emit 包裹,保证执行是会使用自身做上下文。
public void execute(Runnable task) {
getAssociatedThread().execute(() -> emit(task));
}

// 在自身关联的线程上,执行任务。但是任务执行时的上下文没有单独指定。
public void schedule(T argument, Handler<T> task) {
Thread thread = getAssociatedThread();
if (thread.isCurrentThread()) {
task.handle(argument);
} else {
thread.execute(() -> task.handle(argument));
}
}

public void dispatch(T argument, Handler<T> task) {
schedule(v -> emit(argument, task));
}

执行阻塞代码

在使用 EventLoop 时,我们不应该阻塞事件循环。这是因为事件循环的线程数很少,一旦阻塞了,新的请求都会被阻塞。当我们需要执行阻塞代码是,我们往往会使用另外的线程来执行。Context 提供了 executeBlocking 系列方法,来满足这个需求。它的实现伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCode, TaskQueue queue) {
// Worker Pool 就是我们所说的非事件循环线程
return executeBlocking(this, blockingCode, workerPool, queue);
}

static <T> Future<T> executeBlocking(ContextInternal context,
Handler<Promise<T>> blockingCode,
WorkerPool workerPool, TaskQueue queue) {
// 创建一对 Promise 和 Future
// Promise 会传个 blockingCode. BlockingCode 在完成后应该调用 promise 的 complete 方法
// 调用者使用 Future 来监听 blockingCode 是否完成
Promise<T> promise = context.promise();
Future<T> fut = promise.future();

// 构造一个任务
Runnable command = () -> {
// 使用 emit 方法,保证会使用 context 作为上下文
context.emit(promise, f -> {
blockingCode.handle(promise);
});
};

// 使用 workerPool 和 queue 执行任务
queue.execute(command, exec);
return fut;
}

看到上面的代码,我们可以发现,指定的阻塞任务

  1. 执行在 Worker 线程上
  2. 使用了当前的 (EventLoop) 的 Context

这样实现看似有点奇怪,实际上满足了等效于在事件循环上”执行“阻塞代码的要求。这也是为什么 Context 会提供单独的 isEventLoopContext/isWorkerContextisOnEventLoopThread/isOnWorkerThread方法。

阻塞时长监控

虽然我们允许使用 Context 执行阻塞代码,阻塞的时间仍然不能太久。Vertx 通过 BlockedThreadChecker 来监控线程的阻塞时间。

BlockedThreadChecker 的实现比较简单。它通过一个后台线程,定期的检查每个线程当前执行的任务的开始时间离现在有多久。如果超过了最大时间,就打印一条日志。