Comment on page
第十一章:流处理

有效的复杂系统总是从简单的系统演化而来。反之亦然:从零设计的复杂系统没一个能有效工作的。—— 约翰・加尔,Systemantics(1975)
[TOC]
在 第十章 中,我们讨论了批处理技术,它读取一组文件作为输入,并生成一组新的文件作为输出。输出是 衍生数据(derived data) 的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来建立搜索索引、推荐系统、做分析等等。
然而,在 第十章 中仍然有一个很大的假设:即输入是有界的,即已知和有限的大小,所以批处理知道它何时完成输入的读取。例如,MapReduce 核心的排序操作必须读取其全部输入,然后才能开始生成输出:可能发生这种情况:最后一条输入记录具有最小的键,因此需要第一个被输出,所以提早开始输出是不可行的。
实际上,很多数据是 无界限 的,因为它随着时间的推移而逐渐到达:你的用户在昨天和今天产生了数据,明天他们将继续产生更多的数据。除非你停业,否则这个过程永远都不会结束,所以数据集从来就不会以任何有意义的方式 “完成”【1】。因此,批处理程序必须将数据人为地分成固定时间段的数据块,例如,在每天结束时处理一天的数据,或者在每小时结束时处理一小时的数据。
日常批处理中的问题是,输入的变更只会在一天之后的输出中反映出来,这对于许多急躁的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理 —— 比如说,在每秒钟的末尾 —— 或者甚至更连续一些,完全抛开固定的时间切片,当事件发生时就立即进行处理,这就是 流处理(stream processing) 背后的想法。
一般来说,“流” 是指随着时间的推移逐渐可用的数据。这个概念出现在很多地方:Unix 的 stdin 和 stdout、编程语言(惰性列表)【2】、文件系统 API(如 Java 的
FileInputStream
)、TCP 连接、通过互联网传送音频和视频等等。在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。流处理领域中的等价物看上去是什么样子的?
当输入是一个文件(一个字节序列),第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被叫做 事件(event) ,但它本质上是一样的:一个小的、自包含的、不可变的对象,包含某个时间点发生的某件事情的细节。一个事件通常包含一个来自日历时钟的时间戳,以指明事件发生的时间(请参阅 “单调钟与日历时钟”)。
例如,发生的事件可能是用户采取的行动,例如查看页面或进行购买。它也可能来源于机器,例如对温度传感器或 CPU 利用率的周期性测量。在 “使用 Unix 工具的批处理” 的示例中,Web 服务器日志的每一行都是一个事件。
事件可能被编码为文本字符串或 JSON,或者某种二进制编码,如 第四章 所述。这种编码允许你存储一个事件,例如将其追加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许你通过网络将事件发送到另一个节点以进行处理。
在批处理中,文件被写入一次,然后可能被多个作业读取。类似地,在流处理术语中,一个事件由 生产者(producer) (也称为 发布者(publisher) 或 发送者(sender) )生成一次,然后可能由多个 消费者(consumer) ( 订阅者(subscribers) 或 接收者(recipients) )进行处理【3】。在文件系统中,文件名标识一组相关记录;在流式系统中,相关的事件通常被聚合为一个 主题(topic) 或 流(stream) 。
原则上讲,文件或数据库就足以连接生产者和消费者:生产者将其生成的每个事件写入数据存储,且每个消费者定期轮询数据存储,检查自上次运行以来新出现的事件。这实际上正是批 处理在每天结束时处理当天数据时所做的事情。
但当我们想要进行低延迟的连续处理时,如果数据存储不是为这种用途专门设计的,那么轮询开销就会很大。轮询的越频繁,能返回新事件的请求比例就越低,而额外开销也就越高。相比之下,最好能在新事件出现时直接通知消费者。
数据库在传统上对这种通知机制支持的并不好,关系型数据库通常有 触发器(trigger) ,它们可以对变化(如,插入表中的一行)作出反应,但是它们的功能非常有限,并且在数据库设计中有些后顾之忧【4,5】。相应的是,已经开发了专门的工具来提供事件通知。
向消费者通知新事件的常用方式是使用 消息传递系统(messaging system):生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在 “消息传递中的数据流” 中谈到了这些系统,但现在我们将详细介绍这些系统。
像生产者和消费者之间的 Unix 管道或 TCP 连接这样的直接信道,是实现消息传递系统的简单方法。但是,大多数消息传递系统都在这一基本模型上进行了扩展。特别的是,Unix 管道和 TCP 将恰好一个发送者与恰好一个接收者连接,而一个消息传递系统允许多个生产者节点将消息发送到同一个主题,并允许多个消费者节点接收主题中的消息。
在这个 发布 / 订阅 模式中,不同的系统采取各种各样的方法,并没有针对所有 目的的通用答案。为了区分这些系统,问一下这两个问题会特别有帮助:
- 1.如果生产者发送消息的速度比消费者能够处理的速度快会发生什么? 一般来说,有三种选择:系统可以丢掉消息,将消息放入缓冲队列,或使用 背压(backpressure,也称为 流量控制,即 flow control:阻塞生产者,以免其发送更多的消息)。例如 Unix 管道和 TCP 就使用了背压:它们有一个固定大小的小缓冲区,如果填满,发送者会被阻塞,直到接收者从缓冲区中取出数据(请参阅 “网络拥塞和排队”)。如果消息被缓存在队列中,那么理解队列增长会发生什么是很重要的。当队列装不进内存时系统会崩溃吗?还是将消息写入磁盘?如果是这样,磁盘访问又会如何影响消息传递系统的性能【6】?
- 2.如果节点崩溃或暂时脱机,会发生什么情况? —— 是否会有消息丢失? 与数据库一样,持久性可能需要写入磁盘和 / 或复制的某种组合(请参阅 “复制与持久性”),这是有代价的。如果你能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延迟。
是否可以接受消息丢失取决于应用。例如,对于周期传输的传感器读数和指标,偶尔丢失的数据点可能并不重要,因为更新的值会在短时间内发出。但要注意,如果大量的消息被丢弃,可能无法立刻意识到指标已经不正确了【7】。如果你正在对事件计数,那么它们能够可靠送达是更重要的,因为每个丢失的消息都意味着使计数器的错误扩大。
我们在 第十章 中探讨的批处理系统的一个很好的特性是,它们提供了强大的可靠性保证:失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障一样,这有助于简化编程模型。在本章的后面,我们将研究如何在流处理的上下文中提供类似的保证。
许多消息传递系统使用生产者和消费者之间的直接网络通信,而不通过中间节点:
- UDP 组播广泛应用于金融行业,例如股票市场,其中低时延非常重要【8】。虽然 UDP 本身是不可靠的,但应用层的协议可以恢复丢失的数据包(生产者必须记住它发送的数据包,以便能按需重新发送数据包)。
- 无代理的消息库,如 ZeroMQ 【9】和 nanomsg 采取类似的方法,通过 TCP 或 IP 多播实现发布 / 订阅消息传递。
- StatsD 【10】和 Brubeck 【7】使用不可靠的 UDP 消息传递来收集网络中所有机器的指标并对其进行监控。(在 StatsD 协议中,只有接收到所有消息,才认为计数器指标是正确的;使用 UDP 将使得指标处在一种最佳近似状态【11】。另请参阅 “TCP 与 UDP”
- 如果消费者在网络上公开了服务,生产者可以直接发送 HTTP 或 RPC 请求(请参阅 “服务中的数据流:REST 与 RPC”)将消息推送给使用者。这就是 webhooks 背后的想法【12】,一种服务的回调 URL 被注册到另一个服务中,并且每当事件发生时都会向该 URL 发出请求。
尽管这些直接消息传递系统在设计它们的环境中运行良好,但是它们通常要求应用代码意识到消息丢失的可能性。它们的容错程度极为有限:即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。
如果消费者处于脱机状态,则可能会丢失其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但当生产者崩溃时,它可能会丢失消息缓冲区及其本应发送的消息,这种方法可能就没用了。
一种广泛使用的替代方法是通过 消息代理(message broker,也称为 消息队列,即 message queue)发送消息,消息代理实质上是一种针对处理消息流而优化的数据库。它作为服务器运行,生产者和消费者作为客户端连接到服务器。生产者将消息写入代理,消费者通过从代理那里读取来接收消息。
通过将数据集中在代理上,这些系统可以更容易地容忍来来去去的客户端(连接,断开连接和崩溃),而持久性问题则转移到代理的身上。一些消息代理只将消息保存在内存中,而另一些消息代理(取决于配置)将其写入磁盘,以便在代理崩溃的情况下不会丢失。针对缓慢的消费者,它们通常会允许无上限的排队(而不是丢弃消息或背压),尽管这种选择也可能取决于配置。
排队的结果是,消费者通常是 异步(asynchronous) 的:当生产者发送消息时,通常只会等待代理确认消息已经被缓存,而不等待消息被消费者处理。向消费者递送消息将发生在未来某个未定的时间点 —— 通常在几分之一秒之内,但有 时当消息堆积时会显著延迟。
- 数据库通常保留数据直至显式删除,而大多数消息代理在消息成功递送给消费者时会自动删除消息。这样的消息代理不适合长期的数据存储。
- 由于它们很快就能删除消息,大多数消息代理都认为它们的工作集相当小 —— 即队列很短。如果代理需要缓冲很多消息,比如因为消费者速度较慢(如果内存装不下消息,可能会溢出到磁盘),每个消息需要更长的处理时间,整体吞吐量可能会恶化【6】。
- 数据库通常支持次级索引和各种搜索数据的方式,而消息代理通常支持按照某种模式匹配主题,订阅其子集。虽然机制并不一样,但对于客户端选择想要了解的数据的一部分,都是基本的方式。
- 查询数据库时,结果通常基于某个时间点的数据快照;如果另一个客户端随后向数据库写入一些改变了查询结果的内容,则第一个客户端不会发现其先前结果现已过期(除非它重复查询或轮询变更)。相比之下,消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端。
这是关于消息代理的传统观点,它被封装在诸如 JMS 【14】和 AMQP 【15】的标准中,并且被诸如 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO 企业消息服务、IBM MQ、Azure Service Bus 和 Google Cloud Pub/Sub 所实现 【16】。
- 负载均衡(load balancing)每条消息都被传递给消费者 之一,所以处理该主题下消息的工作能被多个消费者共享。代理可以为消费者任意分配消息。当处理消息的代价高昂,希望能并行处理消息时,此模式非常有用(在 AMQP 中,可以通过让多个客户端从同一个队列中消费来实现负载均衡,而在 JMS 中则称之为 共享订阅,即 shared subscription)。
- 扇出(fan-out)每条消息都被传递给 所有 消费者。扇出允许几个独立的消费者各自 “收听” 相同的消息广播,而不会相互影响 —— 这个流处理中的概念对应批处理中多个不同批处理作业读取同一份输入文件 (JMS 中的主题订阅与 AMQP 中的交叉绑定提供了这一功能)。

图 11-1 (a)负载平衡:在消费者间共享消费主题;(b)扇出:将每条消息传递给多个消费者。
两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅同一个主题,每一组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。
消费者随时可能会崩溃,所以有一种可能的情况是:代理向消费者递送消息,但消费者没有处理,或者在消费者崩溃之前只进行了部分处理。为了确保消息不会丢失,消息代理使用 确认(acknowledgments):客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除。
如果与客户端的连接关闭,或者代 理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者。(请注意可能发生这样的情况,消息 实际上是 处理完毕的,但 确认 在网络中丢失了。需要一种原子提交协议才能处理这种情况,正如在 “实践中的分布式事务” 中所讨论的那样)
当与负载均衡相结合时,这种重传行为对消息的顺序有种有趣的影响。在 图 11-2 中,消费者通常按照生产者发送的顺序处理消息。然而消费者 2 在处理消息 m3 时崩溃,与此同时消费者 1 正在处理消息 m4。未确认的消息 m3 随后被重新发送给消费者 1,结果消费者 1 按照 m4,m3,m5 的顺序处理消息。因此 m3 和 m4 的交付顺序与生产者 1 的发送顺序不同。

图 11-2 在处理 m3 时消费者 2 崩溃,因此稍后重传至消费者 1
即使消息代理试图保留消息的顺序(如 JMS 和 AMQP 标准所要求的),负载均衡与重传的组合也不可避免地导致消息被重新排序。为避免此问题,你可以让每个消费者使用单独的队列(即不使用负载均衡功能)。如果消息是完全独立的,则消息顺序重排并不是一个问题。但正如我们将在本章后续部分所述,如果消息之间存在因果依赖关系,这就是一个很重要的问题。
通过网络发送数据包或向网络服务发送请求通常是短暂的操作,不会留下永久的痕迹。尽管可以永久记录(通过抓包与日志),但我们通常不这么做。即使是将消息持久地写入磁盘的消息代理,在送达给消费者之后也会很快删除消息,因为它们建立在短暂消息传递的思维方式上。
数据库和文件系统采用截然相反的方法论:至少在某人显式删除前,通常写入数据库或文件的所有内容都要被永久记录下来。
这种思维方式上的差异对创建衍生数据的方式有巨大影响。如 第十章 所述,批处理过程的一个关键特性是,你可以反复运行它们,试验处理步骤,不用担心损坏输入(因为输入是只读的)。而 AMQP/JMS 风格的消息传递并非如此:收到消息是具有破坏性的,因为确认可能导致消息从代理中被删除,因此你不能期望再次运行同一个消费者能得到相同的结果。
如果你将新的消费者添加到消息传递系统,通常只能接收到消费者注册之后开始发送的消息。先前的任何消息都随风而逝,一去不复返。作为对比,你可以随时为文件和数据库添加新的客户端,且能读取任意久远的数据(只要应用没有显式覆盖或删除这些数据)。
为什么我们不能把它俩杂交一下,既有数据库的持久存储方式,又有消息传递的低延迟通知?这就是 基于日志的消息代理(log-based message brokers) 背后的想法。
同样的结构可以用于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。Unix 工具
tail -f
能监视文件被追加写入的数据,基本上就是这样工作的。在每个分区内,代理为每个消息分配一个单调递增的序列号或 偏移量(offset,在 图 11-3 中,框中的数字是消息偏移量)。这种序列号是有意义的,因为分区是仅追加写入的,所以分区内的消息是完全有序的。没有跨不同分区的顺序保证。

图 11-3 生产者通过将消息追加写入主题分区文件来发送消息,消费者依次读取这些文件
Apache Kafka 【17,18】、Amazon Kinesis Streams 【19】和 Twitter 的 DistributedLog 【20,21】都是基于日志的消息代理。Google Cloud Pub/Sub 在架构上类似,但对外暴露的是 JMS 风格的 API,而不是日志抽象【16】。尽管这些消息代理将所有消息写入磁盘,但通过跨多台机器分区,每秒能够实现数百万条消息的吞吐量,并通过复制消息来实现容错性【22,23】。
基于日志的方法天然支持扇出式消息传递,因为多个消费者可以独立读取日志,而不会相互影响 —— 读取消息不会将其从日志中删除。为了在一组消费者之间实现负载平衡,代理可以将整个分区分配给消费者组中的节点,而不是将单条消息分配给消费者客户端。
然后每个客户端将消费被指派分区中的 所有 消息。通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗 粒度的负载均衡方法有一些缺点:
- 共享消费主题工作的节点数,最多为该主题中的日志分区数,因为同一个分区内的所有消息被递送到同一个节点 。
因此在消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况下,JMS/AMQP 风格的消息代理是可取的。另一方面,在消息吞吐量很高,处理迅速,顺序很重要的情况 下,基于日志的方法表现得非常好。
顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。
实际上,这种偏移量与单领导者数据库复制中常见的日志序列号非常相似,我们在 “设置新从库” 中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导者,并在不跳过任何写入的情况下恢复复制。这里原理完全相同:消息代理表现得像一个主库,而消费者就像一个从库。
如果消费者节点失效,则失效消费者的分区将指派给其他节点,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续的消息,但还没有记录它们的偏移量,那么重启后这些消息将被处理两次。我们将在本章后面讨论这个问题的处理方法。
如果只追加写入日志,则磁盘空间终究会耗尽。为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储。(我们将在后面讨论一种更为复杂的磁盘空间释放方式)
这就意味着如果一个慢消费者跟不上消息产生的速率而落后得太多,它的消费偏移量指向了删除的段,那么它就会错过一些消息。实际上,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃旧消息,它也被称为 循环缓冲区(circular buffer) 或 环形缓冲区(ring buffer)。不过由于缓冲区在磁盘上,因此缓冲区可能相当的大。
让我们做个简单计算。在撰写本文时,典型的大型硬盘容量为 6TB,顺序写入吞吐量为 150MB/s。如果以最快的速度写消息,则需要大约 11 个小时才能填满磁盘。因而磁盘可以缓冲 11 个小时的消息,之后它将开始覆盖旧的消息。即使使用多个磁盘和机器,这个比率也是一样的。实践中的部署很少能用满磁盘的写入带宽,所以通常可以保存一个几天甚至几周的日志缓冲区。
不管保留多长时间的消息,日志的吞吐量或多或少保持不变,因为无论如何,每个消息都会被写入磁盘【18】。这种行为与默认将消息保存在内存中,仅当队列太长时才写入磁盘的消息传递系统形成鲜明对比。当队列很短时,这些系统非常快;而当这些系统开始写入磁盘时,就要慢的多,所以吞吐量取决于保留的历史数量。
在 “消息传递系统” 中,如果消费者无法跟上生产者发送信息的速度时,我们讨论了三种选择:丢弃信息,进行缓冲或施加背压。在这种分类法里,基于日志的方法是缓冲的一种形式,具有很大但大小固定的缓冲区(受可用磁盘空间的限制)。
如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让运维人员来修复慢消费者,并在消息开始丢失之前让其赶上。
即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运维优势:你可以实验性地消费生产日志,以进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的只有消费者偏移量。
这种行为也与传统的消息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列 —— 否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。
我们之前提到,使用 AMQP 和 JMS 风格的消息代理,处理和确认消息是一个破坏性的操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,使用消息更像是从文件中读取数据:这是只读操作,不会更改日志。
除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量的前进。但偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地操纵:例如你可以用昨天的偏移量跑一个消费者副本,并将输出写到不同的位置,以便重新处理最近一天的消息。你可以使用各种不同的处理代码重复任意次。
这一方面使得基于日志的消息传递更像上一章的批处理,其中衍生数据通过可重复的转换过程与输入数据显式分离。它允许进行更多的实验,更容易从错误和漏洞中恢复,使其成为在组织内集成数据流的良好工具【24】。
我们已经在消息代理和数据库之间进行了一些比较。尽管传统上它们被视为单独的工具类别,但是我们看到基于日志的消息代理已经成功地从数据库中获取灵感并将其应用于消息传递。我们也可以反过来:从消息传递和流中获取灵感,并将它们应用于数据库。
我们之前曾经说过,事件是某个时刻发生的事情的记录。发生的事情可能是用户操作(例如键入搜索查询)或读取传感器,但也可能是 写入数据库。某些东西被写入数据库的事实是可以被捕获、存储和处理的事件。这一观察结果表明,数据库和数据流之间的联系不仅仅是磁盘日志的物理存储 —— 而是更深层的联系。
事实上,复制日志(请参阅 “复制日志的实现”)是一个由数据库写入事件组成的流,由主库在处理事务时生成。从库将写入流应用到它们自己的数据库副本,从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。
我们还在 “全序广播” 中遇到了状态机复制原理,其中指出:如果每个事件代表对数据库的写入,并且每个副本按相同的顺序处理相同的事件,则副本将达到相同的最终状态 (假设事件处理是一个确定性的操作)。这是事件流的又一种场景!
在本节中,我们将首先看看异构数 据系统中出现的一个问题,然后探讨如何通过将事件流的想法带入数据库来解决这个问题。
正如我们在本书中所看到的,没有一个系统能够满足所有的数据存储、查询和处理需求。在实践中,大多数重要应用都需要组合使用几种不同的技术来满足所有的需求:例如,使用 OLTP 数据库来为用户请求提供服务,使用缓存来加速常见请求,使用全文索引来处理搜索查询,使用数据仓库用于分析。每一种技术都有自己的数据副本,并根据自己的目的进行存储方式的优化。
如果周期性的完整数据库转储过于缓慢,有时会使用的替代方法是 双写(dual write),其中应用代码在数据变更时明确写入每个系统:例如,首先写入数据库,然后更新搜索索引,然后使缓存项失效(甚至同时执行这些写入)。
但是,双写有一些严重的问题,其中一个是竞争条件,如 图 11-4 所示。在这个例子中,两个客户端同时想要更新一个项目 X:客户端 1 想要将值设置为 A,客户端 2 想要将其设置为 B。两个客户端首先将新值写入数据库,然后将其写入到搜索索引。因为运气不好,这些请求的时序是交错的:数据库首先看到来自客户端 1 的写入将值设置为 A,然后来自客户端 2 的写入将值设置为 B,因此数据库中的最终值为 B。搜索索引首先看到来自客户端 2 的写入,然后是客户端 1 的写入,所以搜索索引中的最终值是 A。即使没发生错误,这两个系统现在也永久地不一致了。

图 11-4 在数据库中 X 首先被设置为 A,然后被设置为 B,而在搜索索引处,写入以相反的顺序到达
双重写入的另一个问题是,其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相不一致的结果。确保它们要么都成功要么都失败,是原子提交问题的一个例子,解决这个问题的代价是昂贵的(请参阅 “原子提交与两阶段提交”)。
如果实际上只有一个领导者 —— 例如,数据库 —— 而且我们能让搜索索引成为数据库的追随者,情况要好得多。但这在实践中可能吗?
大多数数据库的复制日志的问题在于,它们一直被当做数据库的内部实现细节,而不是公开的 API。客户端应该通过其数据模型和查询语言来查询数据库,而不是解析复制日志并尝试从中提取数据。
数十年来,许多数据库根本没有记录在档的获取变更日志的方式。由于这个原因,捕获数据库中所有的变更,然后将其复制到其他存储技术(搜索索引、缓存或数据仓库)中是相当 困难的。
最近,人们对 变更数据捕获(change data capture, CDC) 越来越感兴趣,这是一种观察写入数据库的所有数据变更,并将其提取并转换为可以复制到其他系统中的形式的过程。CDC 是非常有意思的,尤其是当变更能在被写入后立刻用于流时。
例如,你可以捕获数据库中的变更,并不断将相同的变更应用至搜索索引。如果变更日志以相同的顺序应用,则可以预期搜索索引中的数据与数据库中的数据是匹配的。搜索索引和任何其他衍生数据系统只是变更流的消费者,如 图 11-5 所示。

图 11-5 将数据按顺序写入一个数据库,然后按照相同的顺序将这些更改应用到其他系统
我们可以将日志消费者叫做 衍生数据系统,正如在 第三部分 的介绍中所讨论的:存储在搜索索引和数据仓库中的数据,只是 记录系统 数据的额外视图。变更数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在衍生数据系统中,以便衍生系统具有数据的准确副本。
从本质上说,变更数据捕获使得一个数据库成 为领导者(被捕获变化的数据库),并将其他组件变为追随者。基于日志的消息代理非常适合从源数据库传输变更事件,因为它保留了消息的顺序(避免了 图 11-2 的重新排序问题)。
数据库触发器可用来实现变更数据捕获(请参阅 “基于触发器的复制”),通过注册观察所有变更的触发器,并将相应的变更项写入变更日志表中。但是它们往往是脆弱的,而且有显著的性能开销。解析复制日志可能是一种更稳健的方法,但它也很有挑战,例如如何应对模式变更。
LinkedIn 的 Databus【25】,Facebook 的 Wormhole【26】和 Yahoo! 的 Sherpa【27】大规模地应用这个思路。Bottled Water 使用解码 WAL 的 API 实现了 PostgreSQL 的 CDC【28】,Maxwell 和 Debezium 通过解析 binlog 对 MySQL 做了类似的事情【29,30,31】,Mongoriver 读取 MongoDB oplog【32,33】,而 GoldenGate 为 Oracle 提供类似的功能【34,35】。
类似于消息代理,变更数据捕获通常是异步的:记录数据库系统在提交变更之前不会等待消费者应用变更。这种设计具有的运维优势是,添加缓慢的消费者不会过度影响记录系统。不过,所有复制延迟可能有的问题在这里都可能出现(请参阅 “复制延迟问题”)。
如果你拥有 所有 对数据库进行变更的日志,则可以通过重播该日志,来重建数据库的完整状态。但是在许多情况下,永远保留所有更改会耗费太多磁盘空间,且重播过于费时,因此日志需要被截断。
例如,构建新的全文索引需要整个数据库的完整副本 —— 仅仅应用最近变更的日志是不够的,因为这样会丢失最近未曾更新的项目。因此,如果你没有完整的历史日志,则需要从一个一致的快照开始,如先前的 “设置新从库” 中所述。
数据库的快照必须与变更日志中的已知位置或偏移量相对应,以便在处理完快照后知道从哪里开始应用变更。一些 CDC 工具集成了这种快照功能,而其他工具则把它留给你手动执行。
如果你只能保留有限的历史日志,则每次要添加新的衍生数据系统时,都需要做一次快照。但 日志压缩(log compaction) 提供了一个很好的备选方案。
在日志结构存储引擎中,具有特殊值 NULL(墓碑,即 tombstone)的更新表示该键被删除,并会在日志压缩过程中被移除。但只要键不被覆盖或删除,它就会永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不取决于数据库中曾经发生的写入次数。如果相同的键经常被覆盖写入,则先前的值将最终将被垃圾回收,只有最新的值会保留下来。
在基于日志的消息代理与变更数据捕获的上下文中也适用相同的想法。如果 CDC 系统被配置为,每个变更都包含一个主键,且每个键的更新都替换了该键以前的值,那么只需要保留对键的最新写入就足够了。
现在,无论何时需要重建衍生数据系统(如搜索索引),你可以从压缩日志主题的零偏移量处启动新的消费者,然后依次扫描日志中的所有消息。日志能保证包含数据库中每个键的最新值(也可能是一些较旧的值)—— 换句话说,你可以使用它来获取数据库内容的完整副本,而无需从 CDC 源数据库取一个快照。
Apache Kafka 支持这种日志压缩功能。正如我们将在本章后面看到的,它允许消息代理被当成持久性存储使用,而不仅仅是用于临时消息。
越来越多的数据库开始将变更流作为第一等的接口,而不像传统上要去做加装改造,或者费工夫逆向工程一个 CDC。例如,RethinkDB 允许查询订阅通知,当查询结果变更时获得通知【36】,Firebase 【37】和 CouchDB 【38】基于变更流进行同步,该变更流同样可用于应用。而 Meteor 使用 MongoDB oplog 订阅数据变更,并改变了用户接口【39】。
VoltDB 允许事务以流的形式连续地从数据库中导出数据【40】。数据库将关系数据模型中的输出流表示为一个表,事务可以向其中插入元组,但不能查询。已提交事务按照提交顺序写入这个特殊表,而流则由该表中的元组日志构成。外部消费者可以异步消费该日志,并使用它来更 新衍生数据系统。
Kafka Connect【41】致力于将广泛的数据库系统的变更数据捕获工具与 Kafka 集成。一旦变更事件进入 Kafka 中,它就可以用于更新衍生数据系统,比如搜索索引,也可以用于本章稍后讨论的流处理系统。
我们在这里讨论的想法和 事件溯源(Event Sourcing) 之间有一些相似之处,这是一个在 领域驱动设计(domain-driven design, DDD) 社区中折腾出来的技术。我们将简要讨论事件溯源,因为它包含了一些关于流处理系统的有用想法。
与变更数据捕获类似,事件溯源涉及到 将所有对应用状态的变更 存储为变更事件日志。最大的区别是事件溯源将这一想法应用到了一个不同的抽象层次上:
- 在变更数据捕获中,应用以 可变方式(mutable way) 使用数据库,可以任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免 图 11-4 中的竞态条件。写入数据库的应用不需要知道 CDC 的存在。
- 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。事件被设计为旨在反映应用层面发生的事情,而不是底层的状态变更。
事件溯源是一种强大的数据建模技术:从应用的角度来看,将用户的行为记录为不可变的事件更有意义,而不是在可变数据库中记录这些行为的影响。事件溯源使得应用随时间演化更为容易,通过更容易理解事情发生的原因来帮助调试的进行,并有利于防止应用 Bug(请参阅 “不可变事件的优点”)。
例如,存储 “学生取消选课” 事件以中性的方式清楚地表达了单个行为的意图,而其副作用 “从登记表中删除了一个条目,而一条取消原因的记录被添加到学生反馈表” 则嵌入了很多有关稍后对数据的使用方式的假设。如果引入一个新的应用功能,例如 “将位置留给等待列表中的下一个人” —— 事件溯源方法允许将新的副作用轻松地从现有事件中脱开。
诸如 Event Store【46】这样的专业数据库已经被开发出来,供使用事件溯源的应用使用,但总的来说,这种方法独立于任何特定的工具。传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用。
事件日志本身并不是很有用,因为用户通常期望看到的是系统的当前状态,而不是变更历史。例如,在购物网站上,用户期望能看到他们购物车里的当前内容,而不是他们购物车所有变更的一个仅追加列表。
因此,使用事件溯源的应用需要拉取事件日志(表示 写入 系统的数据),并将其转换为适合向用户显示的应用状态(从系统 读取 数据的方式【47】)。这种转换可以使用任意逻辑,但它应当是确定性的,以便能再次运行,并从事件日志中衍生出相同的应 用状态。
与变更数据捕获一样,重播事件日志允许让你重新构建系统的当前状态。不过,日志压缩需要采用不同的方式处理:
- 用于记录更新的 CDC 事件通常包含记录的 完整新版本,因此主键的当前值完全由该主键的最近事件确定,而日志压缩可以丢弃相同主键的先前事件。
- 另一方面,事件溯源在更高层次进行建模:事件通常表示用户操作的意图,而不是因为操作而发生的状态更新机制。在这种情况下,后面的事件通常不会覆盖先前的事件,所以你需要完整的历史事件来重新构建最终状态。这里进行同样的日志压缩是不可能的。
使用事件溯源的应用通常有一些机制,用于存储从事件日志中导出的当前状态快照,因此它们不需要重复处理完整的日志。然而这只是一种性能优化,用来加速读取,提高从崩溃中恢复的速度;真正的目的是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。我们将在 “不变性的局限性” 中讨论这个假设。
事件溯源的哲学是仔细区分 事件(event) 和 命令(command)【48】。当来自用户的请求刚到达时,它一开始是一个命令:在这个时间点上它仍然可能失败,比如,因为违反了一些完整性条件。应用必须首先验证它是否可以执行该命令。如果验证成功并且命令被接受,则它变为一个持久化且不可变的事件。
例如,如果用户试图注册特定用户名,或预定飞机或剧院的座位,则应用需要检查用户名或座位是否已被占用。(先前在 “容错共识” 中讨论过这个例子)当检查成功时,应用可以生成一个事件,指示特定的用户名是由特定的用户 ID 注册的,或者座位已经预留给特定的顾客。
在事件生成的时刻,它就成为了 事实(fact)。即使客户稍后决定更改或取消预订,他们之前曾预定了某个特定座位的事实仍然成立,而更改或取消是之后添加的单独的事件。
事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经成为日志中不可变的一部分,并且可能已经被其他消费者看到了。因此任何对命令的验证,都需要在它成为事件之前同步完成。例如,通过使用一个可以原子性地自动验证命令并发布事件的可串行事务。
我们通常将数据库视为应用程序当前状态的存储 —— 这种表示针对读取进行了优化,而且通常对于服务查询而言是最为方便的表示。状态的本质是,它会变化,所以数据库才会支持数据的增删改。这又该如何匹配不变性呢?
只要你的状态发生了变化,那么这个状态就是这段时间中事件修改的结果。例如,当前可用的座位列表是你已处理的预订所产生的结果,当前帐户余额是帐户中的借与贷的结果,而 Web 服务器的响应时间图,是所有已发生 Web 请求的独立响应时间的聚合结果。
无论状态如何变化,总是有一系列事件导致了这些变化。即使事情已经执行与回滚,这些事件出现是始终成立的。关键的想法是:可变的状态与不可变事件的仅追加日志相互之间并不矛盾:它们是一体两面,互为阴阳的。所有变化的日志 —— 变化日志(changelog),表示了随时间演变的状态。
如果你倾向于数学表示,那么你可能会说,应用状态是事件流对时间求积分得到的结果,而变更流是状态对时间求微分的结果,如 图 11-6 所示【49,50,51】。这个比喻有一些局限性(例如,状态的二阶导似乎没有意义),但这是考虑数据的一个实用出发点。

图 11-6 应用当前状态与事件流之间的关系
如果你持久存储了变更日志,那么重现状态就非常简单。如果你认为事件日志是你的记录系统,而所有的衍生状态都从它派生而来,那么系统中的数据流动就容易理解的多。正如帕特・赫兰(Pat Helland)所说的【52】:
事务日志记录了数据库的所有变更。高速追加是更改日志的唯一方法。从这个角度来看,数据库的内容其实是日志中记录最新值的缓存。日志才是真相,数据库是日志子集的缓存,这一缓存子集恰好来自日志中每条记录与索引值的最新值。
数据库中的不变性是一个古老的概念。例如,会计在几个世纪以来一直在财务记账中应用不变性。一笔交易发生时,它被记录在一个仅追加写入的分类帐中,实质上是描述货币、商品或服务转手的事件日志。账目,比如利润、亏损、资产负债表,是从分类账中的交易求和衍生而来【53】。
如果发生错误,会计师不会删除或更改分类帐中的错误交易 —— 而是添加另一笔交易以补偿错误,例如退还一笔不正确的费用。不正确的交易将永远保留在分类帐中,对于审计而言可能非常重要。如果从不正确的分类账衍生出的错误数字已经公布,那么下一个会计周期的数字就会包括一个更正。这个过程在会计事务中是很常见的【54】。