Twitter materializes (in memory) the items in each of the followers' inboxes
Original Implementation
• Single table, vertically scaled
• Master-Slave replication and Memcached for read throughput.
Partition by time
Queries try each partition in order until enough data is accumulated
Problems w/ solution
• Write throughput
T-Bird Implementation
Partition by primary key
Finding recent tweets by user_id queries N partitions
Partition user_id index by user id
• Partition and index
• Index and partition
• Exploit locality (in this case, temporal locality)
• New tweets are requested most frequently, so usually only 1 partition is checked
The three data problems
• Tweets
• Timelines
• Social graphs
Original Implementation
SELECT * FROM tweets
WHERE user_id IN
(SELECT source_id
FROM followers
WHERE destination_id = ?)
ORDER BY created_at DESC
Crazy slow if you have lots of friends or indices can’t be kept in RAM
Current Implementation
• Sequences stored in Memcached
• Fanout off-line, but has a low latency SLA
• Truncate at random intervals to ensure bounded length
• On cache miss, merge user timelines
Social Graph
Intersection: Deliver to people who follow both @aplusk and @foursquare
Forward Backward
• Partitioned by user id
• Edges stored in “forward” and “backward” directions
• Indexed by time
• Indexed by element (for set algebra)
• Denormalized cardinality
• Data consistency in the presence of failures
• Write operations are idempotent: retry until success
• Last-Write Wins for edges
• (with an ordering relation on State for time conflicts)
• Other commutative strategies for mass-writes
• It is not possible to pre-compute set algebra queries
• Partition, replicate, index.
Many efficiency and scalability problems are solved the same way
• All engineering solutions are transient
• Nothing’s perfect but some solutions are good enough for a while
• Scalability solutions aren’t magic. They involve partitioning, indexing, and replication
• All data for real-time queries MUST be in memory.
Disk is for writes only.
• Some problems can be solved with pre-computation, but a lot can’t
• Exploit locality where possible
The best policy is to decide whether to push or pull events on a per producer/consumer basis. This technique minimizes system cost both for workloads with a high query rate and those with a high event rate. It also exposes a knob, the push threshold, that we can tune to reduce latency in return for higher system cost.
Near real-time event streams are becoming a key feature of many popular web applications. Many web sites allow users to create a personalized feed by selecting one or more event streams they wish to follow.
Examples include Twitter and Facebook, which allow a user to follow other users' activity, and iGoogle and My Yahoo, which allow users to follow selected RSS streams. How can we efficiently construct a web page showing the latest events from a user's feed?
Constructing such a feed must be fast so the page loads quickly, yet reflects recent updates to the underlying event streams. The wide fanout of popular streams (those with many followers) and high skew (fanout and update rates vary widely) make it difficult to scale such applications.
We associate feeds with consumers and event streams with producers.
We demonstrate that the best performance results from selectively materializing each consumer's feed:
events from high-rate producers are retrieved at query time, while events from lower-rate producers are materialized in advance.
A formal analysis of the problem shows the surprising result that we can minimize global cost by making local decisions about each producer/consumer pair, based on the ratio between a given producer's update rate (how often an event is added to the stream) and a given consumer's view rate (how often the feed is viewed). Our experimental results, using Yahoo!'s web-scale database PNUTS, shows that this hybrid strategy results in the lowest system load (and hence improves scalability) under a variety of workloads.
maintains persistent connections with end clients
⇢processes tweet & social graph events
⇢event-based “router”
Fast Data in the Era of Big Data: Twitter’s Real- Time Related Query Suggestion Architecture
Develop a real time query suggestion system
Real Time Query Suggestion
• Good related query suggestions provide:
• Topicality
• Temporality
• Topicality: capture same topic
• Temporality: capture temporal connection
• #SCOTUS suggestions: healthcare and #aca
• Marissa Mayer example
Real Time Query Suggestion
• Time constrain to include news breaks
• When is the best time to make suggestions ?
• Too early: No enough evidences
• Too late: User experience
Twitter 工程师认为,一个用户体验良好的网站,当一个用户请求到达以后,应该在平均500ms以内完成回应。而Twitter的理想,是达到200ms- 300ms的反应速度[17]。因此在网站架构上,Twitter大规模地,多层次多方式地使用缓存
1. 用户表:用户ID,姓名,登录名和密码,状态(在线与否)。
2. 短信表:短信ID,作者ID,正文(定长,140字),时间戳。
3. 用户关系表,记录追与被追的关系:用户ID,他追的用户IDs (Following),追他的用户IDs (Be followed)。
1. Vector Cache and Row Cache
具 体来说,Twitter工程师认为最重要的列是IDs。即新发表的短信的IDs,以及被频繁阅读的热门短信的IDs,相关作者的IDs,以及订阅这些作者 的读者的IDs。把这些IDs存放进缓存 (Stores arrays of tweet pkeys [14])。在Twitter文献中,把存放这些IDs的缓存空间,称为Vector Cache [14]。
Twitter工程师认为,读取最频繁的内容是这些IDs,而短信的正文在其次。所以他们决定,在优先保证Vector Cache所需资源的前提下,其次重要的工作才是设立Row Cache,用于存放短信正文。
命中率(Hit Rate or Hit Ratio)是测量缓存效果的最重要指标。如果一个或者多个用户读取100条内容,其中99条内容存放在缓存中,那么缓存的命中率就是99%。命中率越高,说明缓存的贡献越大。
设立Vector Cache和Row Cache后,观测实际运行的结果,发现Vector Cache的命中率是99%,而Row Cache的命中率是95%,证实了Twitter工程师早先押注的,先IDs后正文的判断。
Vector Cache和Row Cache,使用的工具都是开源的MemCached [15]。
2. Fragment Cache and Page Cache
前文说到,访问Twitter网站的,不仅仅是浏览器,而且还有手机,还有像QQ那样的电脑桌面工具,另外还有各式各样的网站插件,以便把其它网站联系到 Twitter.com上来[12]。接待这两类用户的,是以Apache Web Server为门户的Web通道,以及被称为“API”的通道。其中API通道受理的流量占总流量的80%-90% [16]。
所以,继Vector Cache和Row Cache以后,Twitter工程师们把进一步建筑缓存的工作,重点放在如何提高API通道的反应速度上。
读者页面的主体,显示的是一条又一条短信。不妨把整个页面分割成若干局部,每个局部对应一条短信。所谓Fragment,就是指页面的局部。除短信外,其它内容例如Twitter logo等等,也是Fragment。如果一个作者拥有众多读者,那么缓存这个作者写的短信的布局页面(Fragment),就可以提高网站整体的读取效率。这就是Fragment Cache的使命。
对于一些人气很旺的作者,读者们不仅会读他写的短信,而且会访问他的主页,所以,也有必要缓存这些人气作者的个人主页。这就是Page Cache的使命。
观测实际运行的结果,Fragment Cache的命中率高达95%,而Page Cache的命中率只有40%。虽然Page Cache的命中率低,但是它的内容是整个个人主页,所以占用的空间却不小。为了防止Page Cache争夺Fragment Cache的空间,在物理部署时,Twitter工程师们把Page Cache分离到不同的机器上去。
3. HTTP Accelerator
要降低搜索的压力,不妨把搜索关键词,及其对应的搜索结果,缓存起来。Twitter工程师们使用的缓存工具,是开源项目Varnish [18]。
比较有趣的事情是,通常把Varnish部署在Web Server之外,面向Internet的位置。这样,当用户访问网站时,实际上先访问Varnish,读取所需内容。只有在Varnish没有缓存相应内容时,用户请求才被转发到Web Server上去。而Twitter的部署,却是把Varnish放在Apache Web Server内侧[19]。原因是Twitter的工程师们觉得Varnish的操作比较复杂,为了降低Varnish崩溃造成整个网站瘫痪的可能性,他们便采取了这种古 怪而且保守的部署方式。
1. 把该短信记录到“短信表” 中去。
2. 从“用户关系表”中取出追他的用户的IDs。
3. 有些追他的用户目前在线,另一些可能离线。在线与否的状态,可以在“用户表”中查到。过滤掉那些离线的用户的IDs。
4. 把那些追他的并且目前在线的用户的IDs,逐个推进一个队列(Queue)中去。
5. 从这个队列中,逐个取出那些追他的并且目前在线的用户的IDs,并且更新这些人的主页,也就是添加最新发表的这条短信。
以上这五个步骤,都由逻辑层(Logic Tier)负责。前三步容易解决,都是简单的数据库操作。最后两步,需要用到一个辅助工具,队列。队列的意义在于,分离了任务的产生与任务的执行。
为什么要使用消息队列?[14]的解释是“隔离用户请求与相关操作,以便烫平流量高峰 (Move operations out of the synchronous request cycle, amortize load over time)”。
其中洪峰时刻,Twitter网站每秒钟收到350条新短信,这个流量洪峰维持了大约5分钟。根据统计,平均每个Twitter用户被120人“追”,这就 是说,这350条短信,平均每条都要发送120次 [16]。这意味着,在这5分钟的洪峰时刻,Twitter网站每秒钟需要发送350 x 120 = 42,000条短信。
如何实施隔离呢?当一位用户访问Twitter网站时,接待他的是Apache Web Server。Apache做的事情非常简单,它把用户的请求解析以后,转发给Mongrel Rails Sever,由Mongrel负责实际的处理。而Apache腾出手来,迎接下一位用户。这样就避免了在洪峰期间,用户连接不上Twitter网站的尴尬 局面。
虽然Apache的工作简单,但是并不意味着Apache可以接待无限多的用户。原因是Apache解析完用户请求,并且转发给 Mongrel Server以后,负责解析这个用户请求的进程(process),并没有立刻释放,而是进入空循环,等待Mongrel Server返回结果。这样,Apache能够同时接待的用户数量,或者更准确地说,Apache能够容纳的并发的连接数量(concurrent connections),实际上受制于Apache能够容纳的进程数量。Apache系统内部的进程机制参见Figure 5,其中每个Worker代表一个进程。
Apache能够容纳多少个并发连接呢?[22]的实验结果是4,000个,参见Figure 6。如何才能提高Apache的并发用户容量呢?一种思路是不让连接受制于进程。不妨把连接作为一个数据结构,存放到内存中去,释放进程,直到 Mongrel Server返回结果时,再把这个数据结构重新加载到进程上去。
事实上Yaws Web Server[24],就是这么做的[23]。所以,Yaws能够容纳80,000以上的并发连接,这并不奇怪。但是为什么Twitter用 Apache,而不用Yaws呢?或许是因为Yaws是用Erlang语言写的,而Twitter工程师对这门新语言不熟悉 (But you need in house Erlang experience [17])。
通过让Apache进程空循环的办法,迅速接纳用户的访问,推迟服务,说白了是个缓兵之计,目的是让用户不至于收到“HTTP 503”错误提示,“503错误”是指“服务不可用(Service Unavailable)”,也就是网站拒绝访问。
大禹治水,重在疏导。真正的抗洪能力,体现在蓄洪和泄洪两个方面。蓄洪容易理解,就是建水库,要么建一个超大的水库,要么造众多小水库。泄洪包括两个方面,1. 引流,2. 渠道。
1. 作者的浏览器与网站建立连接,Apache Web Server分配一个进程(Worker Process)。作者登录,Twitter查找作者的ID,并作为Cookie,记忆在HTTP邮包的头属性里。
2. 浏览器上传作者新写的短信(Tweet),Apache收到短信后,把短信连同作者ID,转发给Mongrel Rails Server。然后Apache进程进入空循环,等待Mongrel的回复,以便更新作者主页,把新写的短信添加上去。
3. Mongrel收到短信后,给短信分配一个ID,然后把短信ID与作者ID,缓存到Vector MemCached服务器上去。
同时,Mongrel让Vector MemCached查找,有哪些读者“追”这位作者。如果Vector MemCached没有缓存这些信息,Vector MemCached自动去MySQL数据库查找,得到结果后,缓存起来,以备日后所需。然后,把读者IDs回复给Mongrel。
接着,Mongrel把短信ID与短信正文,缓存到Row MemCached服务器上去。
4. Mongrel通知Kestrel消息队列服务器,为每个作者及读者开设一个队列,队列的名称中隐含用户ID。如果Kestrel服务器中已经存在这些队列,那就延用以往的队列。
对应于每个短信,Mongrel已经从Vector MemCached那里知道,有哪些读者追这条短信的作者。Mongrel把这条短信的ID,逐个放进每位读者的队列,以及作者本人的队列。
5. 同一台Mongrel Server,或者另一台Mongrel Server,在处理某个Kestrel队列中的消息前,从这个队列的名称中解析出相应的用户ID,这个用户,既可能是读者,也可能是作者。
然后Mongrel从Kestrel队列中,逐个提取消息,解析消息中包含的短信ID。并从Row MemCached缓存器中,查找对应于这个短信ID的短信正文。
6. Mongrel把更新后的作者的主页,传递给正在空循环的Apache的进程。该进程把作者主页主动传送(push)给作者的浏览器。
变买为租,应对洪峰,这是一个不错的思路。但是租来的计算资源怎么用,又是一个大问题。查看一下[35],不难发现Twitter把租赁来的计算资源,大部分用于增加Apache Web Server,而Apache是Twitter整个系统的最前沿的环节。
为什么Twitter很少把租赁来的计算资源,分配给Mongrel Rails Server,MemCached Servers,Varnish HTTP Accelerators等等其它环节?在回答这个问题以前,我们先复习一下前一章“数据流与控制流”的末尾,Twitter从写到读的6个步骤。
事实上,通过Apache Web Servers的流量,虽然只占Twitter总流量的10%-20%,但是Apache却占用了Twitter整个服务器集群的50%的资源[16]。 所以,从旁观者角度来看,Twitter将来势必罢黜Apache。但是目前,当Twitter分配计算资源时,迫不得已,只能优先保证Apache的需 求。
