1. DGraph 源码阅读 (1) - 架构简介
  2. DGraph 源码阅读 (2) - Raft
  3. DGraph 源码阅读 (3) - Mutation
  4. DGraph 源码阅读 (4) - Query

这篇文章是学习 DGraph 的第三篇,主要记录一下 DGraph 的写操作逻辑。

写操作逻辑断断续续花了我好几个小时才大概看懂了整个流程,流程比较复杂是一方面,大量的使用 channel 来做异步处理是主要原因。但是写操作是我个人最好奇的部分。想像一下,一个写操作会包含多种 Predicates,而这些 Predicates 又会被 Shard 到多个 Alpha 节点(不熟悉的可以看一看DGraph 源码阅读 (1) - 架构简介),这么一来分布式事务就出现了。

虽然 DGraph 的代码比较难读(对于我来说),但是整个核心思想是很自然的。某种意义上来说,是一个二阶段提交。

  1. 将所有的 mutations 按照 predicate 分组。将每一组发到对应的 group 的 Leader 上,交由它做对应 Raft Group 的 Commit。
  2. 等第一步全部返回之后,向 Zero 节点发送 Commit 请求。Zero 节点对应的 Raft Group 完成 commit 之后将 Proposal 发送个所有的 Alpha 节点。
  3. Alpha 节点收到 Delta 之后,会将对应的 transaction 真正写入硬盘。

注意前两步都是阻塞的,第三步是异步的。以上就是整个 Mutation 的简单流程。

看到这里,你可能会想(反正我这么想了),如果第三步是异步的,那会有问题吧。考虑这种情况,一个 Transaction 提交了,Zero 节点发送了 Delta。这个 Transaction 中包含的 Predicate 分布在三个 Alpha 节点上。要是有的 Alpha 节点还没收到 Delta,就会读到不完整的 Transaction。我目前看下来,猜测解决方案是,使用处理请求的节点的最新的 commit timestamp 当做 snapshot 的 version,但是这样有可能导致读到老的数据?难道客户端也需要维护一个时间戳吗?具体留待下一篇来解释吧。

写操作详细流程

写操作的入口函数是 Mutate,这个方法会被 Http 和 GRPC 服务调用。接下来是整个函数的详细流程。

  1. 获取一个 Timestamp。DGraph 使用时间戳来保证 Linearizability。这个时间戳通过 Zero 节点的 Raft 保证单调递增。
  2. Parse 请求。实际上就是把 json 转为 NQuad 的结构。
  3. 分配 UID。UID 是 DGraph 中的唯一 ID,也需要调用 AssignUidsOverNetwork 向 Zero 节点申请。
  4. 调用 ToInternal将 Mutation 转化为 DirectedEdge。在这一步会将 Blank Node 替换为申请到的 UID。
  5. 调用 ApplyMutations来执行在 Alpha 集群内提交写操作。对应到上面的第一步。
  6. 调用 CommitOverNetwork 来向 Zero 节点提交操作。对应到上面的第二步。

ApplyMutations 函数

ApplyMutations 对传入的边做了简单处理,然后调用 MutateOverNetwork 来让 Alpha 节点负责写入数据。具体流程如下。

  1. 调用 expandEdges。实际上就是在 attr 名为 STAR 的时候去查询这个 uid 对应的所有 predicate。和 SQL 中的 SELECT * 类似。
  2. 调用 MutateOverNetwork。流程如下:
  3. 调用 populateMutationMap。这个方法创建了从 group id 到 mutation 的 map。这是因为不同的 predicate 可能要发到不同的 Leader。Group 一下减少请求的数量。
  4. 调用 proposeOrSend 将每个 group 的 mutation 发到对应的 group 的 leader,如果发现自己就是 leader 就会本地处理。最终会把整个 mutation 序列化之后写入 etcd。

实际写数据

实际写数据分为两部分(见文章开头),一部分是将 Alpha 的 Raft Group 中提交的数据写入。另一部分是等 Zero 的 Raft Group 将整个 Txn 提交之后,将所有的数据写入。这两部分都是在 processApplyCh 中完成的。这个 Goroutine 监听 applyCh 这个 Channel,将 Proposal 注册到 pending transaction 中(对应到第一部分),同时将从 Zero 收到的 Delta 中包含的 Transaction 写入 Badger。

那么 processApplyCh中的数据是从哪来的呢?这就要看 DGraph 源码阅读 (2) - Raft。有一个事件循环会把提交了的 Proposals 放入这个 Channel。第一部分的数据就是有执行写操作的 Leader Propose 然后进入 Commited Entries 中的。而第二部分的数据,则是由 processOracleDeltaStream这个 Goroutine 来 Propose 的。

总结一下

代码可读性

和上一篇文章说的一样,这种到处使用 channel 的风格对我理解这个代码造成了很大的困扰,不知道是不是只是我个人的感受。临到写这篇文章的时候,我忘了 processApplyCh 中第二部分的数据源,结果又找了几分钟才找到。

Raft 的使用

DGraph 对于 Raft 的使用,感觉和 TiKV 以及 CockroachDB 有点相似,有点想知道这两个的实现是什么样的。希望有熟悉的朋友和我交流一下。