综述

什么是 Delta Lake

Delta Lake 是 databricks 开源的一个数据存储层. 它在你常用的云存储服务上提供了一层抽象, 使得 Spark (Batch 和 Streaming) 可以轻松又愉快地读写数据.

它声称提供了这些特性 (我就不一一翻译了):

  1. ACID Transaction
  2. Scalable Metdata Handling
  3. Time Travel
  4. Open Format
  5. Unified Batch and Streaming Source and Sink
  6. Schema Enforcement
  7. Schema Evolution
  8. 100% Compatible with Apache Spark API

Delta Lake 源码下载

戳我, 别怕

源码结构 (以及大概干了什么)

|——-com.databricks.spark.util 包含了 Databricks 的公用 Looging. 可以收集一些数据.
|——-org.apache.spark.sql.delta 包含了 Delta Lake 的核心代码.
|————— actions 定义了对 Delta Metadata 进行的操作
|————— commands 包含了将数据写入 Delta 的逻辑
|————— files 定义了对文件的核心操作, 比如 commit 逻辑
|————— metering 封装了记录操作的行为
|————— schema 定义了对 Schema 的行为
|————— sources 包含了和 Spark 读写整合的逻辑
|————— stats 状态数据结构
|————— storage 定义了存储系统的接口, 使用不同的存储服务需要实现这个接口.
|————— util 工具类
|————— *.java 定义了一系列数据机构和核心逻辑.

上面很多包都只是工具类, 所以不做过多解释, 如果有兴趣的话可以自己读一下源码.

源码分析

读完 Delta 项目的源码, 我感觉这个系统最核心的部分就是怎么管理一个表的元数据. 这里的元数据包含了对这个表的所有修改操作. 只要搞懂了这部分, 其他的相对都是比较容易理解的.

元数据

元数据被抽象成了 Action 类 (同时有个 SingleAction 是用在 Dataset 中的). 有几个比较重要的 Action:

  1. AddFile / RemoveFile: 增加和删除文件, 这里的文件是数据文件. Delta 中的表对应了一系列的文件, 增加删除文件就意味着增删改数据.
  2. Metadata: 表示更新一张表的 Metadata, 比如 Schema, PartitionColumns 等等.
  3. CommitInfo: 表示一次 Commit.
  4. SetTransaction: 表示一次 Spark 任务. 由于一个任务可能重试多次, 这个是为了保证重试不会导致数据出错.

一张表拥有一个 Action 序列, 按照时间重放这些 Action 就能获得这张表在每个时间点的状态, 即这张表在某个时间点:

  1. Schema 是什么
  2. 对应到了哪些数据文件
    等等.

Delta 中将 Actions 保存到 Log 中.

元数据文件

元数据文件分为两类:

  1. Delta
  2. Checkpoint

Delta

每一个 Delta 文件对应到了一次 Commit. Delta Lake 在命名文件的时候会将 commit 的 version 包含在文件名中. 所以要获得某个 version 之前的所有 delta 只需要 list 一下就可以了. 每一个 Delta 文件中包含了它对应的 Commit 中所有的 Action (一个 Commit 可以有多个 Action, 比如多个 AddFile, 一个 Metadata 等等).

Checkpoint

每一个 Checkpoint 文件对应到了一个 version. 这个文件包含了这个 version 及其之前所有的 Actions (会有一些压缩, 比如先 Add 文件, 再 Remove 就会抵消.)

元数据管理

元数据管理主要通过以下几个数据结构完成.

DeltaLog: 对于底层 log 文件的封装

DeltaLog 提供了查询 Log 和向 Log 中原子地增加 Actions 的能力. 它包含两个路径

  1. logPath: 存放元数据文件的路径.
  2. dataPath: 存放数据文件的路径.
    在 logPath 下有一个特殊的文件 _last_checkpoint, 保存了最近一次 checkpoint 的元数据 (这个 checkpoint 的版本和分成了多少份.)

DeltaLog 中有一个 currentSnapshot. 这个字段是通过从 logPath 中读取 _last_checkpoint 对应的文件构造的. 如果没有 _last_checkpoint
那就是一个空的表. snapshot 的更新操作需要获取一个可重入锁.

Transaction 相关方法

DeltaLog 本身不能单独提供事务性的操作能力, 需要利用 OptimisticTransaction 来进行操作.

获取 Log 的相关方法: getChanges / getSnapshotAt

DeltaLog 通过使用 list 方法获取 logPath 中某个 version 之前/之后的所有文件就可以实现. 如果有 Checkpoint 文件, 可以做一些优化.

获取数据的相关方法: CreateRelation / CreateDataFrame

DeltaLog 通过获取对应的 log 文件, 过滤出所有的 AddFile 和 RemoveFile 之后构造出一个 table / stream 对应的文件, 然后用 Spark 的接口构造对应的 DataFrame 或者 Relation.

Streaming 用 createDateaFrame, Relation 用 createRelation. 要注意的是 createDateaFrame 实际也是每次一个 micro batch. 同时 createRelation 支持用分区值做文件过滤.

继承 checkpoint

将 DeltaLog 的所有 Action 保存起来. (每个 Action 的 version 不再保存).

DeltaHistoryManager

一个工具类, 帮助访问历史数据.

Snapshot: log 文件聚合之后在某个 version 的状态

Snapshot 是一个 immutable 的数据结构, 它的核心逻辑是从 log 中重建元数据. 然后从重建的数据中向外暴露有用的信息.

重建状态

Snapshot 中包含一个成员变量 stateReconstruction. 状态重建的逻辑就包含在这个变量的初始化过程中. 重建状态的逻辑很简单, 就是遍历指定的 version 之前的 deltas 和 checkpoints 文件, 构造出相应的 actions, 然后使用 InMemoryLogReplay 来获取最后的状态. 由于数据量会比较大, 这个处理不是在 driver 中做的, 而是使用 Spark 分布式的执行.
这个 stateReconstruction 的类型是 Dataset[SingleAction].

InMemoryLogReplay 的逻辑很简单, 就是遍历一遍 Action, 对于不同类型的 Action 做处理.

  1. Protocol, Metadata: 留下最晚出现的.
  2. AddFile, RemoveFile: 按照顺序构造出表现在对应的文件集合.
  3. SetTransaction: 构造 appId 到 SetTransaction 的映射.

事务支持

事务的支持主要通过 OptimisticTransaction 实现. Delta 支持 Snapshot 级别的隔离等级, 之前页介绍了 Snapshot 的实现. OptimisticTransaction 继承了 TransactionalWrite. 概括来说, OptimisticTransaction 为了保证一次写的事务性, 将写操作分为两部分

  1. 通过 TransactionalWrite 将数据写入文件. 这一步是在 worker 上做的.
  2. 调用 commit 将新的文件以 AddFile 的形式写入 Delta Logs. 这一步是在 driver 上做的.

TransactionalWrite

TransactionalWrite 怎么支持事务地写文件呢? 它利用 DelayedCommitProtocol 来做数据提交, 而 DelayedCommitProtocol 实现了 Spark 提供的 FileCommitProtocol 接口来保证事务性.

TransactionalWrite 的主要逻辑在 writeFiles 这个方法中, 具体就是接受一个 Dataset 然后写入文件, 只不过使用了 DelayedCommitProtocol 做 committer, 并且最后返回所有新建的文件对应的 AddFile.

commit 方法

commit 包含这些调用

  1. prepareCommit
  2. doCommit
  3. postCommit

看着这些方法, 你想到了什么, 是不是 2-Step Commit?

但是并不是. prepareCommit 实际上就是添加一些 action. doCommit 将所有的 Action 写入 log. 而 postCommit 就是看一下如果很久没 checkpoint 就 checkpoint 一下.

这个方法对于事务性的保证是通过 DeltaLog 的可重入锁, 以及拿到锁之后看看自己以为的 version 和最新的 version 一不一致实现的.

文件搜索

这个部分对于理解 Delta 的逻辑不是很重要, 就简单介绍一下. 为了支持 partition filter 的 push down, DeltaTable 这个类支持了通过 query 过滤文件的一些方法.

Source 和 Sink

这个主要是实现了和 Spark 的整合, 能够在 Spark 中读取 Delta 管理的数据.

  • DeltaSource & DeltaSink: 实现了 Streaming 的 source 和 sink.
  • DeltaDataSource: 注册 source 和 sink, 以及 batch 操作的 Spark relation.

DeltaSource

这个类实现了 Spark 的 Source 接口, 主要的方法有 getBatchgetOffset. 接下来介绍一下这个类的大概逻辑.

初始化

当一个新的 Stream 开始时, Delta 会创建一个当前版本的 Snapshot. 这个 Snapshot 会被分成若干个 batch 来处理知道所有已有的数据被处理完. 之后的处理就是通过 tail 所有的 log 来查找并处理新的数据. 这样保证了 batch 和 streaming query 能返回一样的结果.

getBatch

通过指定 startOffset 和 endOffset 来获取一个 batch 的数据. 主要是通过 getChanges 来获取所有的 AddFile, 在调用 DeltaLog 的 createDataFrame 来从 AddFile 对应的文件中读取数据.

这个方法看上去像是给 batch 使用, 但是注意 Spark 的 Streaming 是通过 micro batch 来实现的.

DeltaSink

这个类实现了 Spark 的 Sink 接口, 主要方法是 addBatch. 这方法的实现逻辑比较简单, 就是先 start 一个 transaction, 然后调用 transaction 的 writeFiles 和 commit 方法来提交改动. 具体细节请看上面的事务支持.

写数据

写数据有两个代码路径.

流式

DeltaSinkaddBatch 方法中调用 OptimisticTransactionwriteFilescommit 方法来提交改动.

批处理式

在创建 relation 的时候指定写数据的 callback. 这个 callback 通过 WriteIntoDelta 这个类实现. 这个类也是调用了 OptimisticTransactionwriteFilescommit 方法.