Thursday, December 24, 2015

【年度案例】Twitter高性能分布式日志系统架构解析



http://chuansong.me/n/2078071
https://blog.twitter.com/2014/manhattan-our-real-time-multi-tenant-distributed-database-for-twitter-scale
日志应该是程序员最熟悉的一种数据结构。它存在于大家每天的工作中。它是一组只追加,严格有序的记录序列。它长得像上图这个样子。日志已被证明是一种很有效的数据结构,可用来解决很多分布式系统的问题。在 Twitter,我们就用日志来解决很多有挑战的分布式系统问题。

这里主要举一个例子。我们如何使用日志在 Manhattan(Twitter的最终一致性分布式Key/Value数据库)中实现 Compare-And-Set 这样的强一致性操作。


这是一张 Manhattan 架构的简单抽象图。Manhattan 主要由 3 个组件构成,client, co-ordinator 和 replicas。Client 将请求发送给 co-ordinator,co-ordinator 找出修改键值 (key) 所对应的 replicas。然后修改 replicas。Co-ordinator 在发送请求的时候会附上相应的时间戳,replica 根据时间戳来决定最后哪个修改成功,实现最终一致性。

如果我们需要在这个最终一致性的系统上实现 CAS(Compare-And-Set) 这样的强一致性操作,会碰到什么样的问题呢?冲突!

“冲突”是什么意思呢?举个例子,假设有两个 Client,它们同时想要修改 key x,但修改成不同的结果。绿色的 Client 想将 x 从 3 修改到 4,而红色的 Client想将 x 从 3 修改到 5。


假设绿色的 Client 成功地将第一个副本从 3 修改到 4;而红色的 Client 成功地将第三个副本从 3 修改到 5。那么绿色的 Client 修改第三个副本将会失败,因为第三个副本的值已经变成了 5。同样,红色的 Client 修改第一个副本也会失败。

这是之前提到的“冲突”。因为你不知道这个系统中,x 的最终值应该是 4 还是 5。或者其他值。更严重的是,系统无法从这个“冲突”状态中恢复,也就没有最终一致性可言。


解决办法是什么呢?

日志!使用日志来序列化所有的请求。使用日志后的请求流程将变成如图所示:co-ordinator 将请求写到日志中。所有的 replicas 从日志中按顺序读取请求,并修改本地的状态。



在这个例子中,修改为 4 的操作在修改为 5 的操作之前写入日志。因此,所有的副本会首先被修改成 4。那么修改为 5 的操作将会失败。

到此为止,你可以看出日志的好处。它将一个原本复杂的问题变得简单。

这种解决问题的思路叫做 Pub/Sub。而日志就是 Pub/Sub 模式的基础。

因为 Pub/Sub 这个模式是那么简单而且强有力,这让我们思考,是不是可以构建一个高可用的分布式日志服务,所有在 Twitter 的分布式系统都可以复用这个日志服务?

构建一个分布式日志系统,首要的事情就是找出我们需要解决什么问题,满足什么样的需求。


首先作为一个基本设施,存储在日志中的数据需要持久化,这样它可以容忍宕机,避免数据丢失。

因为需要作为分布式系统的基础设施,那么在单机上持久化是远远不够的。我们需要将数据复制到多台机器上,提高数据和系统的可用性。
当数据被复制到多台机器上的时候,我们就需要保证数据的强一致性。否则,如果我们出现丢数据、数据不一致,那么势必影响到构建在分布式日志上的所有系统。如果日志都不能相信了,你的生活还能相信谁呢 :)

我所在 Twitter 的组,是 messaging 组。主要负责 Twitter 的消息中间件(在 Twitter 内部服务之间搬运数据),比如 Kestrel(用于在线系统)、Kafka(用于离线分析)。

这些系统都不支持严格的持久化,或者在支持持久化的情况下性能极差。它们采用定期回刷 (periodic flush) 磁盘或者依赖于文件系统 (pdflush) 来持久化数据。

它们因为不支持持久化,所以当事故发生时,我们会丢数据。一旦数据丢失,运维系统的人就会非常痛苦。我们经常被责问,如何才能定量丢失的数据。

这就让我们不禁在想,是否能够构建这样一个基础服务,它的基石就是持久化和强一致的?


日志系统的核心负载可以归为三类:writes,tailing reads 和 catch-up reads。

Writes 就是将数据追加到一个日志中,tailing reads 就是从日志的尾部读最新的东西,而 catch-up reads 则是从比较早的位置开始读日志(比如数据库中重建副本)

Writes 和 tailing reads 在意的是延时 (latency),因为它关系到一个消息能多快地从被写入到被读到。

而 catch-up reads 在意的则是高吞吐量,因为它关系到是否能追赶到日志的尾部。

在一个“完美”的世界中,系统应该只有两种负载,writes 和 tailing reads。而且大部分现有系统对于这两种负载可以很好地应付。

但是,在现实世界里,这基本不可能。尤其在一个多租户的环境里,catch-up reads 通常成为影响系统的重要因素。

举个例子,以流式计算为例,用户可能重启一个 topology。而这个 topology 可能从很早地位置开始大量读数据,从而引入大量的 catch-up reads。而这些 catch-up reads 在文件系统角度通常会表现为大批量的扫描,文件系统会进行大量的预读取到 Page Cache 里,从而挤掉最新的数据而影响写操作和 tailing read操作。

在设计这个分布式日志系统 DistributedLog 的时候,我们进行了各种调研。也同时基于运维已有系统 (kestrel, Kafka) 的经验,我们最终决定基于 Apache BookKeeper进行构建。

主要因为 Apache BookKeeper 提供的三个核心特性:I/O 分离、并行复制和容易理解的一致性模型。它们能够很好地满足我们对于持久化、多副本和一致性的要求。

Twitter 如何基于 Apache BookKeeper 构建 DistributeLog?

Apache BookKeeper 最早开始于 2008 年,是 Yahoo 巴塞罗那研究院的研究项目,首要目的是解决 HDFS NameNode 的可用性问题。后来成为 Apache ZooKeeper 的子项目。2014 年底,脱离 Apache ZooKeeper 成为顶级项目。目前被 Yahoo, Twitter,Salesforce 等公司使用。


这张图简单地描述了 Apache BookKeeper 的样子。它主要由三个组件构成,客户端 (client),数据存储节点 (Bookie) 和元数据存储 Service Discovery(ZooKeeper)。

Bookies在启动的时候向 ZooKeeper 注册节点。Client 通过 ZooKeeper 发现可用的 Bookie。

在 Apache BookKeeper 中,读写操作的单元叫做 Ledger。Ledger 是一组追加有序的记录。

客户端可以创建一个 Ledger,然后进行追加写操作。每个 Ledger 会被赋予全局唯一的 ID。读者可以根据 Ledger ID,打开 Ledger 进行读操作。
客户端在创建 Ledger 的时候,从 Bookie Pool 里面按照指定的数据放置策略挑选出一定数量的 Bookie,构成一个 Ensemble。
每条被追加的记录在写者(Writer)会被赋予从 0 开始有序递增的序号,称为 Entry ID。

每条 Entry 会被并行地发送给 Ensemble 里面的所有 Bookies。并且所有 Entry的发送以流水线的方式进行。也就是意味着发送第 N + 1 条记录的写请求不需要等待发送第 N 条记录的写请求返回。

对于每条 Entry 的写操作而言,当它收到 Ensemble 里面大多数 Bookie 的确认后,Client 认为这条记录已经持久化到这个 Ensemble 中,并且有大多数副本,它就可以返回确认给 Application。

写记录的发送可以乱序,但是确认 (Acknowledge) 则会按照 Entry ID 的顺序进行有序确认。从而实现日志的严格有序性。
如果 Ensemble 里面的存活的 Bookies 不能构成大多数,Client 会进行一个 Ensemble Change。

Ensemble Change 将从 Bookie Pool 中根据数据放置策略挑选出额外的 Bookie 用来取代那些存活的 Bookie (图中粉色方块)。通过 Ensemble Change 操作,Apache BookKeeper 保证写操作的高可用性。

理解 Apache BookKeeper 的读操作之前,需要先说明一下 Apache BookKeeper 的一致性模型。

对于 Writer 而言,write 不断地添加记录。每个记录会被 writer 赋予一个严格递增的 ID。所有的追加操作都是异步的。也就是写第二条记录不用等写第一条记录返回。所有写成功的操作按照 ID 递增顺序Ack 回 writer。
伴随着写成功的 Acknowledges,writer 不断地更新一个指针叫做 Last-Add-Confirmed (LAC)。所有 Entry ID 小于等于 LAC 的记录保证持久化并复制到大多数副本上。

而在 LAC 和 LAP (Last-Add-Pushed) 之间的记录就是已经发送到 Bookies 但是尚未被确认写成功的。
所有的 Readers 都可以安全地读取 Entry ID 小于或者等于 LAC 的记录,从而保证 reader 不会读到尚未被确认 (acknowledged) 的记录,从而保证了读者之间的一致性。

在写者方面,BookKeeper 并不进行任何主动的选主 (leader election) 操作。相反地,它提供了内置的 fencing 机制,防止出现多个写者的状态,从而保证写者的一致性。

Apache BookKeeper 没有将很复杂的一致性机制捆绑在一起。写者和读者之间也没有很复杂的协同机制。所有的一致性的协调就是通过这个 LAC 指针 (Last Add Confirmed)。这样的做法,可以使得扩展写者和扩展读者相互分离。
理解了 Apache BookKeeper 的一致性模型之后,我们再回来看它的读操作。

在 Apache BookKeeper中,主要有两种读操作:一种是读指定的 Entry(图(a)),另外一种是读 LAC (图(b))。

因为 Entry 追加之后不再被修改,那么在图 (a) 中,客户端可以到任意一个副本读取相应的 Entry。为了保证低延时(获得平滑的 p999), 我们使用了一个叫 Speculative Read 的机制。读请求首先发送给第一个副本,在指定 timeout 的时间内,如果没有收到 reponse,则发送读请求给第二个副本,然后同时等待第一个和第二个副本。谁第一个返回,即读取成功。通过有效的 Speculative Read,我们很大程度减小了 p999 延时的 spikes,达到可预测的低延时。

另一个操作是读取 LAC。这是读者跟写者之间的 Catch-Up 操作,保证读者读取到最新的数据。因此,它是采用的是 Quorum Read 的做法:从所有 Bookies 读取最新的 LAC,然后等待大多数的答复。

Read Entries 和 Read LAC 构成了Reader的核心操作。在 Twitter,为了进一步降低延时,我们将两种操作进行合并,形成 “Long Poll Read”(图(c)). 客户端发送 Long Poll 请求并在 Bookie 等待最新的 LAC 的更新,一旦写者更新了 LAC,Bookie 返回更新后的 LAC 以及相应的 Entry。这样可以有效地节省多轮网络交互。同时对于 Long Poll Read,我们仍然采用 Speculative 机制,保证平滑的可预测的 p999 延时。

这是 BookKeeper 主要的核心读写流程,并行复制和一致性模型。
Apache BookKeeper 作为基石,解决了分布式日志的核心问题。

但是,它还是相对比较底层。

作为一个共享的日志服务,要在大的组织架构中被广泛使用,首要的问题就是简单性。我们需要让用户思考的是命名的日志,而不是一组数字编号的 Ledgers。日志应该是一组无止尽的记录序列,提供更面向流式的接口。用户只需要考虑如何将数据追加到流里,以及如何从流里读取数据。

如图所示,紫色是一个 DistributedLog 的日志。日志被切分成不同的日志段,每个日志段被存成一个 Apache BookKeeper 的 Ledger。


日志被切分成不同的日志段,每个日志段被存成一个 Apache BookKeeper 的 Ledger。新的日志段在一定时间或者旧的日志段写满时会被创建。因为日志被切割成基本上相近大小的日志段,所以很容易将这些日志分段分散到整个集群中,实现数据的均匀分布。

因为数据持续追加到日志中,我们提供两种方式删除日志。

一种是精确的 truncation,对于数据库这样严格的复制状态机(replicated state machines)的应用场景,它们需要严格控制哪个位置之前的数据是不再需要的。

另外一个是基于时间的自动过期,它适用于不需要严格控制的数据分析场景。
除了核心的抽象,我们要构建一个服务。这个服务如上图所示。

在写入端,我们加了一个服务叫“Write Proxy”,用来接收来自于同源的写入服务。它负责管理每个日志的 ownership,并且在有 proxy server 机的情况下 failover 到其他 proxy server。

需要强调的一点,在这里使用的是 “ownership tracking” 而不是 “leadership election”,我们不需要像 consensus 算法那样严格的 leadership 要求,因为 Apache BookKeeper 提供了内置的 fencing 机制保证多写者的一致性。所以此时的 “Write Proxy” 更像是一个无状态的服务 “stateless service”,可以随时迁移和 failover。

在读方面,我们增加了一个服务叫做 “Read Proxy”。它用来缓存最近的数据,可以用来支持成百上千的 readers 读取相同一个日志。

Write proxy 和 Read proxy 都是无状态的服务。所以可以很容易地运行在像 MesosDocker 或者 Amazon EC2 这样的集群环境中,实现 Auto-Scaling。同时使用这样的分层架构,我们可以轻易地独立地扩展服务层和存储层。
这就是 DistributedLog,Twitter 基于 BookKeeper 构建的分布式日志服务。它包含了我们认为作为日志系统需要的核心功能,我们认为足以满足支持不同的负载,从事务性的在线服务,实时的流式分析到离线的批处理。
而对于其他特性,比如如何 partition 数据,如何 route 数据到不同的日志,如何记录每个 reader 的读取位置,我们交由上层应用程序处理。

不同的应用程序对于延时、一致性和有序性都有不同的需求。只要基础设施是持久化、强一致性和严格有序的,那么就很容易去支持所有其他应用。

DistributeLog 案例分享

我们已经运行 DistributedLog/BookKeeper 有三四年了。

在上面的服务包括:Manhattan 数据库,EventBus (我们自服务的 pubsub system,用于取代 Kafka),跨数据中心的数据库复制,Twitter 搜索的 ingestion pipeline,持久化的 Deferred RPC 系统,用于存储系统的 Sharding Service…

我们现在正在全面取代已有的老系统 Kestrel (用于在线服务的 Queue)和 Kakfa(用于离线分析的Pub/Sub)。

因为时间有限,我主要讲解了 DistributedLog 和 BookKeeper 的大概。中间跳过了一些内容。

我们相信 DistributedLog 是一个相对不错的模块化的架构。它适用于基于 Cloud Services (e.g Amazon EC2, Docker) 的公司,也适用于拥有自己数据中心,运行自己集群系统(e.g Mesos, Yarn) 的公司。

我们计划第一季度开源 DistributedLog 到 Apache 社区。


DistributedLog 的架构可以运行在多机房,实现跨机房的强一致性。

Q & A

1、日志顺序方面,日志的序列号(1,2,3,4……),是否使用了 Twitter 的 snowflake 服务?获取序列号后再推送日志?是上面提到的什么组件做的?
没有使用 Twitter 的 snowflake 服务。因为 Writer 是 single writer,存在 ownership。所有的写会 forward 给 owner 进行序列化。

2、这是 Kafka 的替代产品吗?
是的。Kafka 目前没有被使用在数据库日志的场景。因为 Kafka 的每个 topic 对应一个文件,在 topic 数量特别多,且需要持久化的场景,Kafka 的性能比较差。很难适用于 Twitter 的多租户场景。

3、请问是否研究过 ELK,请问在前面分享的架构中,哪个对应 ELK 中的 Logstash(或fluentd)部分?或是 BookKeeper 就是替换它的?
这里的日志就是数据库的日志。跟日常的文本日志不一样。在 ELK 架构中,E 是文本的索引,K 是 UI。这两个部分不是 DistributedLog/BookKeeper 所解决的问题。DistributedLog/BookKeeper 可以作为 PUB/SUB 这样的消息中间件来做日志的中转,也就是可以用在 L 的部分。

4、分享中提到的 Kestrel 和 Kafka 一个在线 ,一个离线,具体差异是什么?
Kestrel 主要是 producer/consumer queue 的模型。而 Kafka 是 pub/sub 模型。Kestrel 支持 per item 的 transaction,粒度是 item。而 Kafka 的粒度是 partition。

5、Name Log 的具体机制是什么样的? Client 删除日志时怎样保证与读者和写者不冲突?
Name Log 是 DistributedLog 提供的用户接口。底层分块成不同的 Ledgers 进行存储。元数据记录在ZooKeeper。使用 ZooKeeper 的 CAS 操作和 notification 机制来协调。

6、想多了解一下跨数据中心复制,感觉不好做。可否介绍一下?
这个问题比较宽泛。跨数据中心,可以是异步复制,也可以是同步复制。不同场景有不同的权衡。

7、如果 LAC 之后的那条记录始终不能写成功,是不是就阻塞在那里,LAC 就没法移动了?
这是一个很好的问题。Ensemble Change 能够保证写永远 go through。所以 LAC 会被 update 到 bookies。读方面的 Speculative 机制保证能读到 LAC。
8、 这里的 writer 是 Write Proxy 吗?如果是的话,single writer 的吞吐量就是这个 ledger 的最大写的吞吐量了吧,会不会成为瓶颈?
这里的 Writer 是指 Write Proxy。首先,一个 Ledger 的吞吐量,取决于 Bookie 的磁盘/网络带宽。假设,Bookie 的网卡是 1Gbps,一块磁盘作为日志写的磁盘,那么在保证低延时的情况下,Bookie 的吞吐可以达到 50MB/s~70MB/s。在 BookKeeper,可以通过配置 Ledger 的 Ensemble Size, Write Quorum Size 和 Ack Quorum Size,通过 Stripping 写的方式来提高 Ledger 的吞吐。比如,设置 Ensemble Size 为 6, Write Quorum Size 为 3, Ack Quorum Size 为 2。那么吞吐量可以提高到 2 。这是 Ledger 内的 Scalability。

理论上,单个 Ledger 的吞吐可以随着 Ensemble Size 进行扩展。但是,因为所有这个 Ledger 都 writes 都要到 Write Proxy,所以它还取决于 Write Proxy 的网络带宽和后端 Bookie 的磁盘带宽,以及相应的副本数量。比如,Write Proxy 的网卡带宽是 1Gbps,复本为 3,即使后端的 Bookie 的吞吐可以达到 50MB/s~70MBps,Write Proxy 也只能接受 1Gbps / 3 (~30MB/s-~40MB/s) 的数据。

单个日志的吞吐通常取决于物理机器的带宽。但是整个系统的吞吐可以随着日志数量的增加来增加。比如 1 日志可以写 10MB / s,那么 100 日志可以写 1GB/s。

在 DistributedLog 层,我们不做 partition。我们把 partition 的 logic 交给上层应用。因为不同应用对于如何 partition 有不同需求。

9、 Failover 到其他 Proxy Server 时,如何继续产生递增的 Entry ID?
在 failover 到其他 Proxy Server 时,DistributedLog 并不会复用上一个 Proxy Server 的 ledger。所以 failover 之后,它会关闭上个 Proxy Server 写的 ledger,然后重新开一个 ledger 进行写入。递增的 Entry ID 是基于当前 ledger 生成的。从整个日志的角度来看,<ledger id, entry id> 构成了 unique 的记录 ID。如果对于 consensus 算法有所了解,可能会知道 `epoch` 的概念。每个 epoch 会有一个 designated 的 leader。而在 DistributedLog 中,`ledger id` 其实扮演着 `epoch` 的概念。


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts