【论文阅读】DBLog:A Watermark Based Change-Data-Capture Framework

DBLog: A Watermark Based Change-Data-Capture Framework

摘要(ABSTRACT)

本文讨论了应用程序在使用多种异构数据库时所面临的数据同步挑战,并探讨了现有解决方案的局限性。通常,应用会利用不同的数据库以满足特定需求,例如存储基本数据或提供高级搜索功能。因此,保持多个数据库之间同步显得尤为重要。尽管之前有双写和分布式事务等解决方案,但这些方法在可行性、健壮性和维护性方面存在不足。

最近出现的一种替代方法是使用变更数据捕捉(CDC),从数据库的事务日志中捕捉更改的行,并以低延迟将其传递到下游。然而,数据同步不仅需要捕捉更改,还需要复制数据库的全量状态,因为事务日志通常不包含完整的更改历史。同时,还有需要高可用性事务日志事件的场景,以便数据库能够尽可能保持同步。

为了解决上述挑战,本文介绍了一种新颖的CDC框架,名为DBLog。DBLog采用水印(watermark)驱动的方法,允许通过直接从表中选择的行与事务日志事件进行交错,从而捕捉数据库的全量状态。该解决方案使得日志事件在处理选择操作时可以继续前进,而不会出现停滞。选择操作可以在任何时间对所有表、特定表或特定主键的表触发。DBLog以块的形式执行选择操作并跟踪进度,允许暂停和恢复。水印方法不使用锁,对源的影响最低。目前,DBLog已在Netflix的多个微服务中投入生产使用。

1. 介绍(INTRODUCTION)

Netflix使用数百个微服务在数据层进行每天万亿次操作。由于没有单一的数据库设计能够满足所有需求,每个微服务可以利用多个异构数据库。例如,一个服务可以使用MySQL、PostgreSQL、Aurora或Cassandra作为操作数据的数据库,而使用Elasticsearch进行索引。为了保持多个数据库同步,Netflix开发了一个数据增强和同步平台,即Delta。其关键要求是从源到派生存储的低延迟和高度可用的事件流。实现这一目标的关键是使用变更数据捕获(CDC),这是一种能够近乎实时捕获数据库变更行并将其传播到下游消费者的技术。

在数据库系统中,事务日志通常具有有限的保留时间,并且不保证包含完整的变化历史。因此,必须同时捕获数据库的全量状态(full state)。在Netflix的生产环境中进行数据同步时,我们识别出了一些关于全量状态捕获的需求:

  • (a) 在任何时候触发全量状态捕获,因为全量状态可能不仅是在初始时需要,其他时间也可能需要。
  • (b) 随时暂停或恢复,以免在重新开始之后,需要重新捕获大表的全量状态。
  • (c) 事务日志事件和全量状态同时捕获,而不互相拖延,以确保事务日志事件的高度可用性。
  • (d) 防止“时间旅行(time-travel)”,传输事件时保持历史顺序,确保早期的行版本不会在后期版本之后传递。

    time-travel 是描述这样一种情况,时间顺序上 t1 > t2,用户在 t1 时间查询到数据比 t2 时间查询到的数据更新。这种现象通常是由于在做数据传输过程中先同步的 full state 是新的,后续同步的事务日志是老的数据导致。

  • (e) 将该功能作为平台提供,且尽量减少对源数据库的影响,以避免高流量场景下的应用写入阻塞。
  • (f) 适用于多种关系数据库管理系统(RDBMS),如MySQL、PostgreSQL、Aurora等,避免使用特定供应商的功能。

根据这些需求,我们开发了DBLog。DBLog作为一个进程运行,并利用基于水印的方法,允许在执行从表中直接选择的行时,与事务日志事件交错进行,从而捕获数据库的全量状态。我们的解决方案允许日志事件在执行选择时继续进展而不被阻塞。可以在任何时间对所有表、特定表或特定主键执行选择。DBLog分块处理选择,并在状态存储(目前使用Zookeeper)中跟踪进度,从而允许它们在上一个完成的块处暂停和恢复。水印方法不使用表锁,因此对源数据库的影响最小。无论原始数据来自事务日志还是表选择,DBLog都使用相同的格式将捕获的事件发送到输出中。输出可以是像Kafka这样的流,这在事件有多个消费者时是一个常见选择。然而,DBLog也可以直接写入数据存储或API。DBLog在设计时还考虑了高可用性(HA),采用主动-被动架构,其中任一时间只有一个DBLog进程处于活动状态,而多个被动进程处于待命状态,必要时可以接管以恢复工作。因此,下游消费者可以在源数据更改后迅速收到行数据。图1展示了DBLog的高层架构。

figure1.jpg

2. 相关调研(RELATED WORK)

我们评估了一系列现有的产品,如:Databus [8]、Debezium [10]、Maxwell [22]、MySQLStreamer [15]、SpinalTap [6] 和 Wormhole [16]。现有的解决方案在捕获事务日志中的事件方面类似,使用相同的底层协议和API,例如MySQL的binlog复制协议或PostgreSQL的复制槽。捕获的事件被序列化成专有的事件格式,并发送到到一个 output 中比较典型的就是 Kafka。一些解决方案如SpinalTap和Wormhole仅提供日志处理,而没有内置捕获数据库全量状态的能力,这种情况下需要通过其他方式处理全量状态的捕获。一些现有的解决方案则内置了捕获全量状态的能力。由于事务日志通常具有有限的保留期,因此无法用于重建完整的源数据集。现有产品以不同的方式解决此问题,各具优缺点:

Databus [8] 具有一个引导服务(bootstrap),该服务从源读取事务日志事件,并将其存储在一个单独的数据库中。下游消费者可以访问引导服务(bootstrap),以便进行初始化或备份。在 bootstarp 执行之后,消费者开始处理 bootstrap 之前的事务日志事件,从而实现重叠,不会漏掉事件。日志的追赶可能导致“时间旅行”,即引导时的行状态可能更为最近,而之后从日志捕获的则是较旧的状态。最终,从事务日志中将发现最新的状态。

Databus 的逻辑与当前全量 + 增量模式非常解决,先获取日志存储 + full state + 增量消费追赶

Debezium [10] 通过使用表锁并在一个事务中在所有表中运行选择,捕获MySQL和PostgreSQL的一致快照。然后,从所有现有行被选择后,捕获事务日志中的事件,时间为事务结束后的时间。根据实现和数据库的不同,这种锁定的持续时间可能是短暂的,也可能在整个选择过程中持续,比如MySQL的RDS [10]。在后一种情况下,写入流量会被阻塞,直到所有行被选择,这对于大型数据库来说可能是一个较长的时间段。

Debezium 的(官网描述)[https://debezium.io/documentation/reference/stable/connectors/mysql.html] snapshot.mode 模式决定了 Debezium 的行为,该默认值为 initial。使用了 global read lock or table-level locks
MySQL 中 global read lock 是通过 FLUSH TABLES WITH READ LOCK 实现,阻塞所有写入直到表被释放
MySQL 中 table-level locks 是通过 LOCK TABLES table_name READ 实现,会阻塞该表所有写入操作
Debezium 的操作步骤

  1. 建连
  2. 根据配置项等信息确认需要捕获的表
  3. 获取要捕获的表的全局读取锁,阻塞写入。
  4. 以可重复读语义开启一个事务,以确保在该事务内的所有后续读取都基于一致的快照
  5. 读当前 binlog 的 position
  6. 捕获数据库中所有表的结构,或所有指定进行捕获的表。Connector 将架构信息保存在其内部数据库架构历史主题中,包括所有必要的 DROP…​ 和 CREATE…​ DDL 语句。
  7. 释放步骤 3 的全局读取锁,写入可以进行下去
  8. 在第 5 步中,Connector 读取 binlog 位置处,Connector 开始扫描指定用于捕获的表,在扫描过程中,Connector 完成以下任务:
  9. 确定表是在快照开始之前创建的,如果表是在快照开始后创建的,则将跳过该表。快照完成后,Connector 转为流式处理,它会为任何快照开始后创建的表发出变更事件
  10. 捕获表中的每一行数据并生成 read event,所有 read event 都包含相同的 binlog 位置(第 5 步中获取的)
  11. 将每个读取事件发送给源表的 Kafka topic
  12. 释放数据表锁(如果使用)
  13. 提交事务
  14. 在 connector 中的 offset 中记录快照完成的情况
    Debezium 使用的读取数据逻辑类似 Select * from Table。这是需要数据库支持流式读?

在Maxwell [22]中,通过暂停事务日志处理来执行转储,然后选择所需表中的行。之后,日志事件处理恢复。这种方法容易导致“时间旅行”,即选择可能返回更近期的行值,而较旧的值则随后从日志中捕获。最终,最新状态将从日志中被消费。

MySQLStreamer [15] 在源上为每个表创建一个副本,即副本表。然后,将原始表中的行分块插入副本表,从而在插入时生成事务日志条目。副本表是使用MySQL的黑洞引擎创建的,因此插入不会占用表空间,但仍然会生成事务日志事件。使用锁来确保历史的顺序不被违反。MySQLStreamer服务随后从事务日志中消费事件,并能够检测到源自副本表的事件,将其标记为原始表的事件。通过这种方式,下游消费者接收到的事件按照表分组,既可以是来自实际应用更改的事件,也可以是来自副本表的事件。

表1总结了我们在第1节中列出的捕获全量状态的要求,并在现有产品之间进行了比较。我们发现没有现有的方法能满足所有要求。一些限制是由于设计原因而产生的,例如首先尝试选择一致的快照然后再捕获日志事件。选择特定于供应商的功能(如MySQL黑洞引擎)是另一个观察到的问题,妨碍了跨数据库的代码重用。一些解决方案还利用表锁,这可能会在短时间或较长时间内阻塞应用程序的写入流量。鉴于这些观察结果,我们决定实施一种新的处理转储的方法,以满足我们的所有要求。

table1.jpg

3. DBLOG

DBLog是一个基于Java的框架,能够从数据库的事务日志中捕获更改的行,并通过在表上执行查询来捕获数据库的全量状态。查询是分块执行的,并与日志事件交错进行,从而使日志事件处理不会长时间停滞。这是通过利用水印(watermark)的方法实现的。查询可以通过API在运行时执行。这使得DBLog的输出能够在最初或稍后的时间通过全量状态引导,便于进行修复。如果输出是启用了日志压缩的Kafka,那么下游消费者可以通过读取Kafka中的事件来引导,它们将包含完整的数据集,并通过附加从源捕获的更改行持续更新。在仅有一个消费者的用例中,DBLog也可以将事件直接发送到数据存储或API。

我们设计这个框架时,尽量减少对数据库的影响。如果需要,查询可以暂停和恢复。这与故障恢复相关,也有助于在数据库达到瓶颈时停止处理。我们还避免对表施加锁,从而不阻塞应用程序写入。我们使用Zookeeper来存储与日志事件处理和块选择相关的进度。我们还利用Zookeeper进行领导者选举,以确定活跃的进程,而其他进程则保持闲置,作为被动备用。我们选择Zookeeper是因为它成熟、读写延迟低、在需要时支持线性可读(linearizable reads),以及在大多数节点可达时可用于写入。我们在构建DBLog时考虑了插件的可替换性,允许根据需要更换实现,便于用其他数据存储替代Zookeeper。

以下小节将更详细地解释事务日志捕获和全量状态捕获。

3.1 捕获事务日志(Transaction log capture)

这个框架要求数据库在提交顺序中为每个更改的行发出事件。在 MySQL 和 PostgreSQL 中存在一种复制协议,数据库在提交后不久通过 TCP 套接字将事件交付给 DBLog。事件可以是以下类型之一:创建、更新或删除。对于我们的应用场景,我们假设事件包含操作发生时的所有列值。尽管 DBLog 如果捕获了部分列也可以使用。对于每个事件,我们假设有一个日志序列号(Log-Sequence-Number,LSN),它是事务日志中事件的偏移量,以一个 8 字节的单调递增数字进行编码。

每个事件被序列化为 DBLog 事件格式,并追加到一个输出缓冲区,该缓冲区位于内存中,是 DBLog 进程的一部分。另一个线程随后从输出缓冲区消费事件,并按照顺序将它们发送到实际输出。输出是一个简单的接口,允许接入任何目的地,例如流、数据存储或任何具有 API 的服务。

我们还捕获模式(schema)更改。模式更改捕获的性质在不同数据库中有所不同,因此日志中可能会包含模式更改增量,或者数据库可能会在每个发出的事件中包含模式信息。由于空间限制,本文未涉及我们在 DBLog 中捕获模式的方式。

3.2 捕获全量状态(Full state capture)

由于事务日志通常具有有限的保留时间,因此无法用来重构完整的源数据集。在尝试解决这个问题时,两个主要挑战是确保日志处理不会停滞,并且历史事件的顺序得以保留。解决这个问题的一个现有方案是,在源数据库中为每个表创建一个副本,并分块填充数据,以便复制的行能够按照正确的顺序出现在事务日志中。然后,可以消费事务日志事件,并接收所有行的最新状态以及更改行。然而,这种方案会在源数据库上消耗写入I/O,并且需要额外的磁盘空间。可以通过使用特定供应商的功能,例如MySQL的黑洞引擎,来避免占用额外的表空间。

我们开发了一种只使用常见数据库特性、对源数据库影响尽可能小的解决方案。我们并不是将行的最新状态实际写入事务日志,而是从表中选择行,以块的形式将其存储在内存中,放置在我们从事务日志捕获的事件旁边。这种方式可以保留日志事件的历史。

我们的解决方案允许通过API在任何时间提取所有表、特定表或特定主键的全量状态。选择操作是按表执行的,并按配置好的大小进行分块。通过按升序主键顺序对表进行排序来选择块,包含主键大于前一个块最后一个主键的行。该查询必须高效执行,以最小化对源数据库的影响。出于这些原因,DBLog要求数据库能够提供对主键的高效范围扫描,并且仅允许在具有主键的表上进行选择。图2使用一个简单的示例说明了块选择的过程。

figure2.jpg

我们在Zookeeper中存储已完成块的最后一行,以便在最新完成的块后暂停和恢复处理。块的处理需要以一种保留日志变更历史的方式进行,以确保返回旧值的块选择不会覆盖从事务日志捕获的新状态,反之亦然。为了实现这一点,我们在事务日志中创建可识别的水印事件,以便我们能够对块选择进行排序。水印是通过在源数据库中创建的一个表实现的。该表存储在一个专用的命名空间中,以避免与应用表发生冲突。该表中只插入一行,存储一个全球唯一标识符(UUID)值。然后,通过更新该行的UUID值来生成水印。行的更新会导致一个变更事件,最终被DBLog捕获。

算法1描述了一种基于水印的方法,用于选择特定表的下一个数据块。该算法在表中仍有剩余数据块时重复执行。日志事件处理在第一步时短暂暂停。通过更新水印表生成水印(第二步和第四步)。数据块选择发生在两个水印之间,并且数据块在内存中存储(第三步)。一旦高水印被写入,我们便恢复日志事件处理,将接收到的日志事件发送到输出,并在日志中监视低水印事件。一旦收到低水印事件,我们开始移除内存中数据块中所有在水印之间发生变化的主键的行(第六步)。一旦收到高水印事件,我们最终将所有剩余的数据块条目附加到输出缓冲区,然后以顺序方式再次处理日志事件(第七步)。

第三步的数据块选择是为了返回代表截至历史某一点已提交更改的状态。换句话说,选择是在事务日志的特定位置上执行的,考虑到截至该点的已提交事务。数据库通常不公开对事务日志上选择的执行位置(MariaDB 是一个例外)。我们方法的核心思想是确定一个保证包含数据块选择的事务日志窗口。窗口通过写入低水印来打开,然后进行选择,最后通过写入高水印来关闭。由于选择的确切位置未知,因此所有与该窗口内的日志事件发生冲突的选择数据块行都被移除。这确保了数据块选择不会覆盖日志变化的历史。为了实现这一点,我们必须读取低水印写入时或其之后的表状态(可以包含在低水印写入后和读取之前提交的更改)。更一般而言,要求数据块选择必须能够看到在其执行之前已提交的更改。我们将这种能力定义为“非过期读取”。此外,由于高水印随后写入,我们要求选择的执行时间在高水印之前。

figure3.jpg

图3a和3b展示了数据块选择的水印算法。我们提供了一个示例,其中表具有主键 k1 到 k6。每个更改日志条目代表一个创建、更新或删除事件。图中的步骤与算法1的标签相对应。在图3a中,我们展示了水印生成和数据块选择(步骤1到4)。在第二步和第四步更新水印表创建了两个更改事件(用粗体突出显示),这些事件最终通过更改日志接收。在图3b中,我们关注从结果集中移除的选择数据块行,这些行的主键出现在水印之间(步骤5到7)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Algorithm 1: Watermark-based Chunk Selection

Input: table
(1) pause log event processing
lw := uuid(), hw := uuid()
(2) update watermark table set value = lw
(3) chunk := select next chunk from table
(4) update watermark table set value = hw
(5) resume log event processing
inwindow := 𝑓 𝑎𝑙𝑠𝑒
// other steps of event processing loop
while true do
e := next event from changelog
if not inwindow then
if e is not watermark then
append e to outputbuffer
else if e is watermark with value lw then
inwindow := 𝑡𝑟𝑢𝑒
else
if e is not watermark then
(6) if chunk contains e.key then
remove e.key from chunk
append e to outputbuffer
else if e is watermark with value hw then
(7) for each row in chunk do
append row to outputbuffer
// other steps of event processing loop

请注意,如果一个或多个事务在低水位标和高水位标之间提交了一大批行更改,可能会出现大量的日志事件。日志事件处理在第4步后逐个事件恢复,最终发现水位标,而无需缓存日志事件条目。由于第2到第4步预计较快,日志处理仅会短暂暂停:水位标更新是单次写操作,而块选择是在带有限制的主键索引上进行的。一旦在第7步接收到高水位标,非冲突块行将按顺序附加到输出缓冲区,最终交付到输出。附加到输出缓冲区是非阻塞操作,因为输出交付在单独的线程中运行,允许常规的日志处理在第7步后恢复。

figure4.jpg

在图4中,我们使用与图3a和3b相同的示例,描述事件写入输出的顺序。首先附加高水位标之前出现的日志事件。然后是块选择中剩余的行(下划线条目)。最后是高水位标之后发生的日志事件。这说明了日志事件和完整数据提取事件之间的交错关系。

3.3 数据库支持(Database Support)

为了使用DBLog,数据库需要以提交顺序发出变更行,并支持非过期读取。这些条件由MySQL、PostgreSQL、MariaDB等系统满足,因此该框架可以在这些类型的数据库中以统一的方式使用。

到目前为止,DBLog支持MySQL、PostgreSQL和Aurora。在所有情况下,数据库以提交顺序提供日志事件[2][4],并且通过读取已提交隔离级别,可以实现对单个选择事务的非过期读取[3][5]。为了集成日志事件,对于MySQL,我们使用MySQL二进制日志连接器[17],该连接器实现了binlog复制协议。对于PostgreSQL,我们使用带有wal2json插件的复制槽[18]。通过由PostgreSQL Java数据库连接(JDBC)驱动程序实现的流复制协议接收更改。每个捕获更改的模式确定在MySQL和PostgreSQL之间有所不同。在PostgreSQL中,wal2json包含列名和类型以及列值。在MySQL中,模式更改增量以binlog事件的形式接收。

通过使用SQL和JDBC集成了全状态捕获,只需实现块选择和水印更新即可。相同的代码可用于MySQL和PostgreSQL,也可以用于其他支持JDBC的数据库。转储处理本身不依赖于SQL或JDBC,并允许集成满足DBLog框架要求的数据库,即使它们不是关系数据库管理系统(RDBMS)。

4. DBLOG 产品化(DBLOG IN PRODUCTION)

DBLog是Netflix MySQL和PostgreSQL连接器的基础。它们都被用于我们名为Delta的数据同步和增强平台[7]。自2018年以来,DBLog已在生产环境中运行,截至本文撰写时,已在Netflix约30个生产使用服务中部署。这些用例涵盖了异构数据复制、数据库活动日志记录和架构迁移。

异构数据复制:为了跟踪制作,对与电影相关的所有数据进行搜索是至关重要的。这涉及由不同团队管理的数据,每个团队负责不同的业务实体,如剧集、演员和交易。这些服务在AWS RDS中使用MySQL或PostgreSQL存储其数据。DBLog被部署在每个相关的数据存储中,以捕获完整的数据集和实时更改到输出流中。然后,这些流被合并并输入到ElasticSearch中的一个公共搜索索引中,从而提供对所有相关实体的搜索。

数据库活动日志记录:DBLog还用于记录数据库活动,以便检查数据库上发生了什么样的更改。在这种情况下,更改的行被捕获并交付到一个流中。流处理器随后将事件传播到ElasticSearch(用于短期存储)和Hive(用于长期存储)。在ElasticSearch中使用Kibana构建活动仪表板,以便团队可以检查每个表上发生的操作数量。这用于检查数据变更模式,并且可能对检测意外模式至关重要,例如新服务代码发布后插入表的操作骤减。

架构迁移:当一个团队将一个MySQL数据库迁移到另一个数据库,并在第二个数据库中使用新的表结构时,DBLog部署在旧数据库上,以捕获全量状态以及发生的新更改,并将其写入流中。然后,一个Flink作业消费这些数据,将其转换为新的表架构格式并写入新数据库。通过这种方式,可以通过在填充后的新架构上运行来提前验证新数据库的读取,而写入仍发生在旧架构上。在后续步骤中,写入流量也可以转到新架构,同时可以停止对旧数据库的流量。

5. 结论(CONCLUSIONS)

在本文中,我们提出了一种新颖的基于水印的变更数据捕获(CDC)框架。DBLog 的功能扩展了能够实时捕获数据库事务日志中更改行的能力,并将整个数据库的状态提取作为集成解决方案的一部分。此外,DBLog 提供了端点,用户可以在任何时候请求全量状态并执行,而不会阻塞日志事件处理。这是通过对表进行分块选择并将获取的行与日志事件交错进行,以便两者都可以进展。同时,由于基于水印的方法,历史的原始顺序始终得以保留,并且不会在源数据库上使用锁。此外,还设置了控制机制,允许对块选择进行限制,或在需要时暂停和恢复。当在非常大的表上捕获全量状态时,如果处理过程崩溃,这一点尤其重要,这样程序不需要从头开始重复。DBLog 设计用于将事件传递到任何输出,无论是数据库、流还是API。这些功能为多个数据系统的同步开辟了新的途径。

由于 Netflix 运营着数百个具有独立数据需求的微服务,DBLog 已成为 Netflix 数据同步和增强平台的基础。它消除了应用程序开发人员在维护多个数据存储时的复杂性。DBLog 及其基于水印的方法旨在为关系数据库管理系统(RDBMS)类型的数据库工作。作为下一步,我们正在研究其他 CDC 框架,以支持不符合 DBLog 框架的数据库,例如像 Apache Cassandra 这样的多主 NoSQL 数据库。我们的目标是支持与 DBLog 相似的功能,即能够在任何时候捕获全量状态,与日志事件交错,并对源的影响最小化。