Saturday, December 12, 2015

Pinterest - Building a scalable/available/Smartt home feed



https://medium.com/@Pinterest_Engineering/sharding-pinterest-how-we-scaled-our-mysql-fleet-3f341e96ca6f
We maintain a configuration table that says which machines these shards are on:
[{“range”:     (0,511), “master”: “MySQL001A”, “slave”: “MySQL001B”},
 {“range”: (512, 1023), “master”: “MySQL002A”, “slave”: “MySQL002B”},
    ...
 {“range”: (3584, 4095), “master”: “MySQL008A”, “slave”: “MySQL008B”}]
This config only changes when we need to move shards around or replace a host. If a master dies, we can promote the slave and then bring up a new slave. The config lives in ZooKeeper and, on update, is sent to services that maintain the MySQL shard.

Since we wanted this data to span multiple databases, we couldn’t use the database’s joins, foreign keys or indexes to gather all data, though they can be used for subqueries that don’t span databases.

All data needed to be replicated to a slave machine for backup, with high availability and dumping to S3 for MapReduce. We only interact with the master in production. You never want to read/write to a slave in production. Slaves lag, which causes strange bugs. Once you’re sharded, there’s generally no advantage to interacting with a slave in production.

We created a 64 bit ID that contains the shard ID, the type of the containing data, and where this data is in the table (local ID). The shard ID is 16 bits, type ID is 10 bits and local ID is 36 bits. The savvy additionology experts out there will notice that only adds to 62 bits. My past in compiler and chip design has taught me that reserve bits are worth their weight in gold. So we have two (set to zero).
ID = (shard ID << 46) | (type ID << 36) | (local ID<<0)
Given this Pin: https://www.pinterest.com/pin/241294492511762325/, let’s decompose the Pin ID 241294492511762325:
Shard ID = (241294492511762325 >> 46) & 0xFFFF = 3429
Type ID  = (241294492511762325 >> 36) & 0x3FF = 1
Local ID = (241294492511762325 >>  0) & 0xFFFFFFFFF = 7075733
So this Pin object lives on shard 3429. It’s type is 1 (i.e. ‘Pin’), and it’s in the row 7075733 in the pins table. For an example, let’s assume this shard is on MySQL012A. We can get to it as follows:
conn = MySQLdb.connect(host=”MySQL012A”)
conn.execute(“SELECT data FROM db03429.pins where local_id=7075733”)

There are two types of data: objects and mappings. Objects contain details, such as Pin data.

Object Tables!

Object tables, such as Pins, users, boards and comments, have an ID (the local ID, an auto-incrementing primary key) and a blob of data that contains a JSON with all the object’s data.
CREATE TABLE pins (
  local_id INT PRIMARY KEY AUTO_INCREMENT,
  data TEXT,
  ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
For example, a Pin object looks like this:
{“details”: “New Star Wars character”, “link”: “http://webpage.com/asdf”, “user_id”: 241294629943640797, “board_id”: 241294561224164665, …}
To create a new Pin, we gather all the data and create a JSON blob. Then, we decide on a shard ID (we prefer to choose the same shard ID as the board it’s inserted into, but that’s not necessary). The type is 1 for Pin. We connect to that database, and insert the JSON into the pins table. MySQL will give back the auto-incremented local ID. Now we have the shard, type and new local ID, so we can compose the full 64 bit ID!

A mapping table links one object to another, such as a board to the Pins on it. The MySQL table for a mapping contains three columns: a 64 bit ‘from’ ID, a 64 bit ‘to’ ID and a sequence ID. There are index keys on the (from, to, sequence) triple, and they live on the shard of the ‘from’ ID.
CREATE TABLE board_has_pins (
  board_id INT,
  pin_id INT,
  sequence INT,
  INDEX(board_id, pin_id, sequence)
) ENGINE=InnoDB;
Mapping tables are unidirectional, such as a board_has_pins table. If you need the opposite direction, you’ll need a separate pin_owned_by_board table. The sequence ID gives an ordering (our ID’s can’t be compared across shards as the new local ID offsets diverge). We usually insert new Pins into a new board with a sequence ID = unix timestamp. The sequence can be any numbers, but a unix timestamp is a convenient way to force new stuff always higher since time monotonically increases. You can look stuff up in the mapping table like this:
SELECT pin_id FROM board_has_pins 
WHERE board_id=241294561224164665 ORDER BY sequence 
LIMIT 50 OFFSET 150
This will give you up to 50 pin_ids, which you can then use to look up Pin objects.
What we’ve just done is an application layer join (board_id -> pin_ids -> pin objects). One awesome property of application layer joins is that you can cache the mapping separate from the object. We keep pin_id -> pin object cache in a memcache cluster, but we keep board_id -> pin_ids in a redis cluster. This allows us to choose the right technology to best match the object being cached.

The next way to add more capacity is to open up new ranges. Initially, we only created 4,096 shards even though our shard ID is 16 bits (64k total shards). New objects could only be created in these first 4k shards. At some point, we decided to create new MySQL servers with shards 4,096 to 8,191 and started filling those.

You lose some nice properties of the ID shard, such as spacial locality. You have to start with all shards made in the beginning and create the key yourself ( it will not make one for you). Always best to represent objects in your system with immutable IDs. That way you don’t have to update lots of references when, for instance, a user changes their username.

consider building a cluster of background processing machines (pro-tip use pyres) to script moving your data from your old databases to your shiny new shard. I guarantee that data will be missed no matter how hard you try (gremlins in the system, I swear), so repeat the data transfer over and over again until the new things being written into the new system are tiny or zero.


https://engineering.pinterest.com/blog/building-scalable-and-available-home-feed
From a Pinner’s point of view, availability means how often they’ll get errors. For service owners, availability means how many minutes the service can be down without violating SLA (service level agreement)
The Pinterest home feed is a personalized collection of Pins for each person. One third of Pinterest’s traffic lands on home feed, which makes it one of our most critical pages. When building our new home feed, achieving four nines or higher was one of the metrics used for measuring the success of the project.

Isolating challenges

The home feed system can be simplified to support three use cases: writing Pinners’ feed to a storage, serving feed from the storage and removing feed when it’s required. 
Writing feed can have a huge QPS (query per second). Fortunately it’s not user-facing and certain delay (e.g. seconds or even minutes) is tolerable. 
Serving has relatively small QPS when comparing the writing operation, but it’s user-facing and has a tight performance requirement.
A simple design can include writing all feed to a storage and serving and deleting from it. At our current scale, we keep hundreds of terabyte data and support millions of operations per second. We’ve had success with HBase in our past iterations of the home feed system. After evaluating all the options, we chose HBase as our backend storage.

The problem with the design is it’s very challenging to tune the same storage to meet the requirements for both a high volume of writing and a high performance of reading and updating. 

For example, when a person creates a new Pin, we’ll fan out the Pin to all his or her followers. Followers are sharded across all HBase regions. When we fan out the same Pin to hundreds of Pinners, the write operation will hit multiple regions, lock the WAL (write ahead log) on each region server, update it and unlock it after use. Locking the WAL for each write/update/delete operation isn’t efficient and quickly becomes a bottleneck. 

A better approach is to batch operations and push the changes to HBase once in a while, which increases the throughput of the HBase cluster dramatically. But the latency of each operation can be as high as the flush interval. For user-facing operations, our latency requirement is at millisecond level and the approach will fail us miserably.
To satisfy the different requirements, we designed a system with two HBase clusters and save data to different HBase clusters at different stages.
  • Zen is a service that provides a graph data model on top of HBase and abstracts the details of HBase operations from data producer and consumer.
  • SmartFeed worker is pushing feed from all sources (we also reference sources as pools) to HBase through Zen, and called by PinLater, an asynchronous job execution system that can tolerate certain delays and failures.
  • HBase for materialized content saves the Pins that have potentially been shown in the home feed before, and its content is accessed through Zen.
  • SmartFeed content generator is in charge of selecting new Pins from the pools, scoring and ordering them.
  • SmartFeed service is indirectly retrieving feed (content) from both of the HBase clusters, and only talks to the pools cluster through SmartFeed content generator.
When a Pinner hits their home feed:
  • SmartFeed service calls content generator to get new Pins
  • Content generator decides how many Pins should be returned and how they should be ordered in the returned result
  • Simultaneously SmartFeed service retrieves saved Pins from HBase for materialized content
  • SmartFeed service will wait for the results from the above two steps, mix and return them. (If the calls to content generator fails or timeouts, the result from step 2 will still be returned.)
  • Offline, SmartFeed service will save the new result to HBase for materialized content and delete them from HBase for pools
With this design, we separate user-facing components from non user-facing components. Since different HBase clusters have different volumes of data and usage patterns, we can scale and configure them individually to meet their needs. In reality, we have far less Pins in materialized content cluster than the cluster for pools. We can make it more reliable and faster without too much cost.
To improve the availability over four nines, we implement something called speculative execution. We always keep a hot standby HBase cluster in a different EC2 availability zone to avoid losing Pinners’ data. Any changes made to the primary HBase cluster will be synced to the standby cluster within a few hundred milliseconds. In the event of a partial failure of the primary cluster we’ll serve the data from a standby cluster. This technique helps make the whole system four nines of read availability (not write) and provides a much better Pinner experience than failing the requests. The way that the speculative execution works is:
  • Make a call to the primary cluster to retrieve data
  • If the call fails or doesn’t return within certain time, make another call to the standby cluster
  • Return the data from the cluster which returns first
With this approach, SmartFeed service will be able to return data if either of the clusters is available and the overall availability is close to the combined availability of the two clusters. The tricky part is to pick a proper waiting time. Since syncing data from the primary cluster to the standby cluster has some delay, the data returned from the standby cluster can be stale. If the waiting time is too small, Pinners will have a higher chance of getting stale data. If the waiting time is too long, Pinners have to wait unnecessarily long even if we can return results from the standby cluster much earlier. For us, we find if a call doesn’t return within time x, it’ll eventually time out in most cases. The time x is also larger than the 99.9 percentile of the call’s latency. We decided to use this as the cutoff time, which means results may be returned from the standby cluster for one out of 1,000 calls.
Another interesting finding is that the latency of the standby cluster is higher than the primary cluster because so few calls fall back to the standby cluster, and it’s in a ‘cold’ state for most of the time. To warm up the pipeline and get it ready for use, we randomly forward x percent of calls to the standby cluster and drop the result.
One time the primary HBase was down for almost one hour because of some hardware issue. Thanks to speculative execution, all home feed requests automatically failover to the standby cluster. The performance and success rate of home feed was not impacted at all during the whole HBase incident.
https://engineering.pinterest.com/blog/building-smarter-home-feed
The home feed should be a reflection of what each user cares about. Content is sourced from inputs such as people and boards the user follows, interests, and recommendations. To ensure we maintain fast, reliable and personalized home feeds, we built the smart feed with the following design values in mind:
1. Different sources of Pins should be mixed together at different rates.
2. Some Pins should be selectively dropped or deferred until a later time. Some sources may produce Pins of poor quality for a user, so instead of showing everything available immediately, we can be selective about what to show and what to hold back for a future session.
3. Pins should be arranged in the order of best-first rather than newest-first. For some sources, newer Pins are intuitively better, while for others, newness is less important.

The architecture behind smart feed

We shifted away from our previously time-ordered home feed system and onto a more flexible one. The core feature of the smart feed architecture is its separation of available, but unseen, content and content that’s already been presented to the user. We leverage knowledge of what the user hasn’t yet seen to our advantage when deciding how the feed evolves over time.

Smart feed is a composition of three independent services, each of which has a specific role in the construction of a home feed.

The role of the smart feed worker

The smart feed worker is the first to process Pins and has two primary responsibilities - to accept incoming Pins and assign some score proportional to their quality or value to the receiving user, and to remember these scored Pins in some storage for later consumption.
Essentially, the worker manages Pins as they become newly available, such as those from the repins of the people the user follows. Pins have varying value to the receiving user, so the worker is tasked with deciding the magnitude of their subjective quality.




Incoming Pins are currently obtained from three separate sources: repins made by followed users, related Pins, and Pins from followed interests. Each is scored by the worker and then inserted into a pool for that particular type of pin. Each pool is a priority queue sorted on score and belongs to a single user. Newly added Pins mix with those added before, allowing the highest quality Pins to be accessible over time at the front of the queue.
Pools can be implemented in a variety of ways so long as the priority queue requirement is met. We choose to do this by exploiting the key-based sorting of HBase. Each key is a combination of user, score and Pin such that, for any user, we may scan a list of available Pins according to their score. Newly added triples will be inserted at their appropriate location to maintain the score order. This combination of user, score, and Pin into a key value can be used to create a priority queue in other storage systems aside from HBase, a property we may use in the future depending on evolving storage requirements.

Smart feed content generator

the smart feed content generator is concerned primarily with defining what “new” means in the context of a home feed. When a user accesses the home feed, we ask the content generator for new Pins since their last visit. The generator decides the quantity, composition, and arrangement of new Pins to return in response to this request.
The content generator assembles available Pins into chunks for consumption by the user as part of their home feed. The generator is free to choose any arrangement based on a variety of input signals, and may elect to use some or all of the Pins available in the pools. Pins that are selected for inclusion in a chunk are thereafter removed from the pools so they cannot be returned as part of subsequent chunks.
The content generator is generally free to perform any rearrangements it likes, but is bound to the priority queue nature of the pools. When the generator asks for n pins from a pool, it’ll get the n highest scoring (i.e., best) Pins available. Therefore, the generator doesn’t need to concern itself with finding the best available content, but instead with how the best available content should be presented.

Smart feed service

In addition to providing high availability of the home feed, the smart feed service is responsible for combining new Pins returned by the content generator with those that previously appeared in the home feed. We can separate these into the chunk returned by the content generator and the materialized feed managed by the smart feed service.
The materialized feed represents a frozen view of the feed as it was the last time the user viewed it. To the materialized Pins we add the Pins from the content generator in the chunk. The service makes no decisions about order, instead it adds the Pins in exactly the order given by the chunk. Because it has a fairly low rate of reading and writing, the materialized feed is likely to suffer from fewer availability events. In addition, feeds can be trimmed to restrict them to a maximum size. The need for less storage means we can easily increase the availability and reliability of the materialized feed through replication and the use of faster storage hardware.
The smart feed service relies on the content generator to provide new Pins. If the generator experiences a degradation in performance, the service can gracefully handle the loss of its availability. In the event the content generator encounters an exception while generating a chunk, or if it simply takes too long to produce one, the smart feed service will return the content contained in the materialized feed. In this instance, the feed will appear to the end user as unchanged from last time. Future feed views will produce chunks as large as, or larger than, the last so that eventually the user will see new Pins.
Continuing with this project, we intend to better model users’ preferences with respect to Pins in their home feeds. Our accuracy of recommendation quality varies considerably over our user base, and we would benefit from using preference information gathered from recent interactions with the home feed. Knowledge of personal preference will also help us order home feeds so the Pins of most value can be discovered with the least amount of effort.
http://timyang.net/architecture/pinterest-feed/
Pinterest首页的Feed消息流,最早是按照用户的关注对象的Pin(类似微博)聚合后按时间进行排序(自然序,类似朋友圈),后来版本的feed系统放弃了自然序,而是根据一定规则及算法来设计,内部称之为Smart feed,其算法及架构根据其公开资料整理如下,值得业界做信息流产品的技术架构师参考。
Pinterest每个用户的首页feed都是个性化内容。Pinterest系统大约1/3流量都指向feed页面,因此它是整个系统最关键的页面之一。当工程师开发新版Smart Feed时,如何达到99.99%可用性也是衡量项目是否成功的指标之一。
Pinterest smart feed的主要算法及规则如下
  • 不同来发表来源的Pin按照不同的频次聚合。
  • 将Pin按照算法及权重有选择的去除(或延迟加载),质量较低的发表来源不必每次显示全部,系统可以有选择的决定哪些立即出现,哪些延迟显示。Pin的质量都是从当前接收用户的角度来衡量。
  • Pin排序的逻辑是最好的优先,而不是最新的优先。一些发表来源的Pin可能最新的优先,但另外一些发表来源的可能新的Pin优先级低。
Pinterest Feed如图所示主要由以下几部分构成,最左边是数据来源,最右边是用户看到的Pinterest瀑布流。中间的三个服务介绍如下。
pinterest-1

Feed worker

Feed worker职责:接收新的pin并根据接收的用户的不同赋予pin权重并保存。同一个Pin,不同的接收用户有不同的权重打分。
新的pin主要有三个来源:关注用户,相关内容,关注关系的感兴趣内容。Worker会给每个来源的pin打分之后插入到一个pool里面,每个Pool是针对单个用户的优先队列(Priority Queue,即优先级高的内容先出)。
由于Feed Worker按照接收用户的维度存储,因此所有的pin进入worker时候已经按照关注关系进行分发(即行内通常说的Feed推模式)。

Feed content generator

Feed content generator负责返回用户上次访问后新的pin。Content Generator可以返回前n条或者全部新的pin,用户获取过(即浏览过)的pin会从pool中删除。Content Generator可以将多个发表源的pin按照一定规则重新排列,但是不能改变原来的Priority Queue返回的优先顺序。即队列中高优先级的会被优先取出。

Smart feed service

物化视图用于保存用户上次feed列表的快照。此服务不需要对feed的重新排序,它将上次返回给用户的pin按照当时的顺序完整保存,由于它属于用户已阅读过的历史列表,读写较少,因此它可以提供更好的可用性。另外由于可以限制历史列表的长度,存储空间可控,因此可以更低成本来增加从库来提高访问的可用性。
Feed依赖content generator来提供新的Pin,如果content generator不可用,服务可以优雅的降级,用户仍然可以获取历史的列表,返回物化存储的结果。
Pinterest通过以上3个不同的服务,实现了对feed返回内容灵活的控制,每个服务都有自己明确的职责,达到了每个用户都具备个性化返回内容的目标。

Feed存储

Pinterest的feed存储需要解决以下几个需求:
  • 写入新发表的feed,由于Pinterest采用的是推模式,这个场景需要面临需要高的写入QPS,但用户能容忍一定的写入延迟。
  • 获取首页的物化feed列表,相对与写入的QPS要小很多,但是用户对请求的延迟容忍度低。
  • 删除feed。
可以采用简单的设计方法,比如将所有的feed写入到一个存储,可以简单实现访问、更新及删除功能。在Pinterest当前的访问规模有上百T的数据以及每秒百万访问操作。经过综合评估,选择使用HBase来实现了上述需求,Pinterest业务场景需要提供非常高的读写及更新操作,HBase同时提供较高的读写及更新访问性能。
用户发表一个新的Pin时,将Pin分发给他所有的粉丝,他的粉丝可能被shard到所有的HBase region上,因此一个分发操作可能要访问到多个region,并锁定每个region的WAL日志,然后进行更新再解锁。每次的write/delete/update操作锁定WAL非常低效,而且很快成为系统的瓶颈。更好的方法是将HBase的操作批量进行,并且可以加大HBase的吞吐能力,但另外一方面增加了访问的时延latency,如果是面向用户请求的操作,访问时延增大是不能接受的。
为了满足不同的需求,Pinterest设计使用了双HBase集群的方法,将数据在不同的阶段写入到不同的HBase集群的方法,请参考图示。
pinterest-3
Zen是一个在HBase基础上提供图(Graph)存储的服务。
SmartFeed Worker将用户发表的内容分发后通过Zen保存在HBase中,异步处理任务通过PinLater服务来调用。
SmartFeed ContentGenerator负责返回最新的Pin,并进行评分及排序。
当用户刷新请求自己首页的feed时,SmartFeed服务从Content Generator和物化存储的HBase归并数据返回给用户,如果生成服务请求超时,则系统仍然可以返回物化存储的数据给用户。在后台,SmartFeed将物化存储的数据从左边的存储删除。
在实际的场景中,物化存储HBase的数据远远要比发表池的数据要少,这样请求的速度会非常快。

Feed的高可用

使用上述设计后,系统的可用性相当于物化存储HBase的可用性。HBase集群目前存在GC卡顿的风险,还有单点故障region迁移等问题,因此使用单一的HBase集群,可用性很难保证99.99%以上。
为了解决这个问题,在另外一个EC2可用区启用一个备用集群,任何写入到主集群的数据将会在数百毫秒内同步到另外一个集群上。当主集群不可用时,可以从备用集群返回用户请求的数据。通过上述设计,整个系统的可用性达到99.99%以上(不包括写)。
pinterest-2

http://pingineering.tumblr.com/post/105293275179/building-a-scalable-and-available-home-feed
Pinnability: Machine learning in the home feed
https://engineering.pinterest.com/blog/pinnability-machine-learning-home-feed


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts