Showing posts with label Company - Pinterest. Show all posts
Showing posts with label Company - Pinterest. Show all posts

Sunday, December 13, 2015

Pinterest Architecture



https://engineering.pinterest.com/blog/introducing-pinterest-secor

Pinterest logging pipeline
Our data logging center of gravity is a Kafka cluster. Kafka introduces abstractions that simplify collecting logs, but is only capable of streaming data to local disks and therefore isn’t suitable as a long-term data store.
Logs are stored on Amazon S3, and while it’s a highly reliable and scalable storage solution, S3 comes with the possibility of eventual consistency, meaning there are no guarantees for when uploaded files will become visible to readers. S3 also has non-monotonic properties that may cause files to “disappear” and reappear moments later.

No-reads principle

Secor works around the limitations of the eventual consistency model by adhering to a principle that it never reads back anything it wrote to S3. It relies on Kafka consumer offset management protocol to keep track of what’s been uploaded to S3. Kafka stores the underlying metadata in ZooKeeper, while metadata commit points are controlled by Secor and they occur with a very low frequency of roughly one update per Kafka partition per hour.
The fact that metadata is stored separately from the data introduces a potential complication of keeping two stores in sync. Secor addresses this issue by enforcing that data is updated before the metadata and by using deterministic S3 paths. Any inconsistency caused by a successful update of the data followed by a failed commit to the metadata store will auto-resolve itself during subsequent state updates.
https://engineering.pinterest.com/blog/pinnability-machine-learning-home-feed

http://www.infoq.com/cn/news/2015/12/Pinterest-Web-URL
https://engineering.pinterest.com/blog/fetching-and-serving-billions-urls-aragog

Aragog architecture

There are several important considerations that must be realized when building infrastructure that deals with billions of URLs: 
  1. Normalization/canonicalization: The same URL can be represented in many different forms, and several URLs may eventually redirect to the same URL. URL normalization (deduplication of different URL representations) and canonicalization (deduplication of URLs pointing to the same page) play a significant role in reducing the amount of data storage required for serving.
  2. Crawl politeness: At this scale, it’s important to rate limit and smooth out the traffic going out to each particular domain. Furthermore, robots.txt rules need to be respected appropriately.
  3. Modeling URL data: One may want to store pieces of extracted metadata associated with a URL or store and update the inlinks and outlinks associated with a URL.
Aragog is composed of two services: the Aragog Fetcher, which fetches the web pages, respecting appropriate rate limits and canonicalizing URLs appropriately, and the Aragog UrlStore, which stores and serves all of the processed metadata and signals about the URLs. The figure below depicts some of many interactions between our crawl pipelines/frontend and Aragog.        

Aragog Fetcher

The Aragog Fetcher is a Thrift service responsible for fetching the URLs politely. Aragog Fetcher issues the HTTP requests, follows redirects and retrieves the page content and the HTTP headers. The fetcher returns a Thrift struct enclosing the page content, HTTP headers, fetch latency, the redirect chain and other data. 
Implementing crawl politeness requires two things:
  1. Respecting the rules in robots.txt
  2. Smoothing and rate limiting traffic to a particular domain
The Aragog Fetcher retrieves the robots.txt file on a particular domain, caching its contents for seven days. When a request is made to fetch a URL, it applies the fetch/don’t-fetch rules from robots.txt. If the robots.txt allows fetching, it calls out to the rate limiter with the URL’s domain.
The rate limiter may allow the request immediately, insist the fetcher to delay the request for a period of milliseconds to smooth out the URL fetching or force it to fail because the rate has been exceeded. To ensure Aragog Fetcher doesn’t overburden a domain with too many requests, the rate limiters allow up to 10 QPS to a single domain. We override this limit for some popular or more frequently crawled domains as necessary. The overrides are propagated as a configuration file to the pool of rate limiters using our config management system.
The rate limiter is served by a pool of machines sharded using consistent hashing by the URL domain. As a result, a single machine is responsible for making rate limiting decisions on a single domain. It also minimizes the amount of rate limiting state moving around when a rate limiter process/machine is added or decommissioned. Each rate limiter machine stores a mapping from the domain to the timestamp when a fetch was last scheduled. The rate limiter retrieves this timestamp (let’s call itlastScheduledFetchTimeMs) and schedules the next fetch accordingly. For example if the allow QPS is 10, the rate limiter will schedule a fetch for this URL at lastScheduledFetchTimeMs + 100 (since we want to space out the requests at 100ms). The rate limiter uses a CAS update to optimistically update the last scheduled fetch time for the URL and retries if the CAS operation fails. It calculates the delay by subtracting the current time from lastScheduledFetchTimeMs. When there’s a large burst of requests, the delay will be large (more than one second). When this happens, the rate limiter throws an exception back to the fetcher. Storing an 8 byte timestamp makes for very little overhead per domain.
Whenever a URL is rate limited, the client simply reschedules the fetch to a later time, a feature intrinsically supported by our asynchronous task execution system called PinLater

Aragog UrlStore

Every time you see a Rich Pin, you’re looking at data served from the Aragog UrlStore. The UrlStore is the storage and serving system that holds the metadata extracted from pages fetched. It holds the page content itself, semi-structured data extracted from the page and web graph metadata such as inlinks and outlinks. We created this shared system so that product teams can rapidly build functionality that uses this metadata without the burden of building their own scalable serving infrastructure. 
There were a couple of key design considerations we made when designing the system. First, we wanted a one-stop shop for all URL metadata across the organization. Second, we wanted to serve Pinterest’s full online read traffic from our API tier at acceptably low latencies, as well as read-write traffic from our offline processing systems which are a combination of batch and real-time processing. 
To accomplish this, we built a federated storage system that provides a comprehensive data model while storing metadata efficiently in systems that have an appropriate size, latency, durability and consistency.  
Here are a few examples of how we made the tradeoff between latency, durability and consistency.
Page content
We store the full content of web pages fetched. These are large blobs of data that get retrieved infrequently and only for offline pipelines for processing. We choose to store this data in S3 because affordable large storage size was more important than low-latency. 
Each web page is stored as a separate S3 file. We use a hash of the URL (normalized and canonical) as the key, but we found that S3 is susceptible to key hot spots. When you create many keys with long common prefixes, you can overload individual servers within the S3 cluster, degrading the performance for some of the keys in your bucket (using the URLs as keys will create these hotspots). We initially tried to use the URL with reverse domain (imagine a million keys in a single S3 bucket that all begin “com.etsy/...”) but ended up receiving hotspotting complaints from Amazon.
In order to help Pinners find what they’re searching for in the most effective ways, we must understand their intentions behind search queries.

Data around the query

The more people search, the better we can suggest results. From the previous example, we can guess that the next person who issues the query “turkey” may also be interested in the “turkey recipes.” The information extracted from previous query log has shown to be effective in understanding the user’s search intent. 

Search context such as adjacent queries in the same search session and clicked Pins after submitting a search query can help us improve the discovery experience for future searches.
To capture the information about a search query and make it available for other applications to process, derive signals and build features on top of it, we designed a data collection called QueryJoin that contains the following data:
  • The search query, which is also the identifier for the QueryJoin.
  • Demographic stats such as gender, country or language.
  • Adjacent queries, which we store queries that appeared in the same session to learn how users refined their search queries to find things they were looking for.
  • Pins, as we store a set of Pins returned for the search query. For each Pin, we have aggregated data from the PinJoin (the data collection of a cluster of Pins with the same image signature and the information about those Pins) as well as some engagement stats like the number of clicks, repins and likes.

Pinterest - Building the interests platform



https://engineering.pinterest.com/blog/building-interests-platform
In contrast with conventional methods of generating such data, which rely primarily on machine learning and data mining techniques, our system relies heavily on human curation. The ultimate goal is to build a system that’s both machine and human powered, creating a feedback mechanism by which human curated data helps drive improvements in our machine algorithms, and vice versa.
Raw input to the system includes existing data about Pins, boards, Pinners, and search queries, as well as explicit human curation signals about interests. With this data, we’re able to construct a continuously evolving interest dictionary, which provides the foundation to support other key components, such as interest feeds, interest recommendations, and related interests.

Generating the interest dictionary

We generated an initial collection of interests by extracting frequently occurring n-grams from Pin and board descriptions, as well as board titles, and filtering these n-grams using custom built grammars. While this approach provided a high coverage set of interests, we found many terms to be malformed phrases. For instance, we would extract phrases such as “lamborghini yellow” instead of “yellow lamborghini”. This proved problematic because we wanted interest terms to represent how Pinners would describe them, and so, we employed a variety of methods to eliminate malformed interests terms.
We first compared terms with repeated search queries performed by a group of Pinners over a few months. Intuitively, this criterion matches well with the notion that an interest should be an entity for which a group of Pinners are passionate.
Later we filtered the candidate set through public domain ontologies like Wikipedia titles. These ontologies were primarily used to validate proper nouns as opposed to common phrases, as all available ontologies represented only a subset of possible interests. This is especially true for Pinterest, where Pinners themselves curate special interests like “mid century modern style.”
Finally, we also maintain an internal blacklist to filter abusive words and x-rated terms as well as Pinterest specific stop words, like “love”. This filtering is especially important to interest terms which might be recommended to millions of users.
We arrived at a fair quality collection of interests following the above algorithmic approaches. In order to understand the quality of our efforts, we gave a 50,000 term subset of our collection to a third party vendor which used crowdsourcing to rate our data. To be rigorous, we composed a set of four criteria by which users would evaluate candidate Interests terms:
- Is it English?
- Is it a valid phrase in grammar?
- Is it a standalone concept?
- Is it a proper name?
This type of effort, however, is not easy to scale. The real solution is to allow Pinners to provide both implicit and explicit signals to help us determine the validity of an interest. Implicit signals behaviors like clicking and viewing, while explicit signals include asking Pinners to specifically provide information (which can be actions like a thumbs up/thumbs down, starring, or skipping recommendations).
To capture all the signals used for defining the collections of terms, we built a dictionary that stores all the data associated with each interest, including invalid interests and the reason why it’s invalid. This service plays a key role in human curation, by aggregating signals from different people. On top of this dictionary service, we can build different levels of reviewing system.

Identifying Pinner interests

With the Interests dictionary, we can associate Pins, boards, and Pinners with representative interests. One of the initial ways we experimented with this was launching a preview of a page where Pinners can explore their interests.

Calculating related interests

Related interests are an important way of enabling the ability to browse interests and discover new ones. To compute related interests, we simply combine the co-occurrence relationship for interests computed at Pin and board levels.

Powering interest feeds

Interests feeds provide Pinners with a continuous feed of Pins that are highly related. Our feeds are populated using a variety of sources, including search and through our annotation pipeline. A key property of the feed is flow. Only feeds with decent flow can attract Pinners to come back repeatedly, thereby maintaining high engagement. In order to optimize for our feeds, we’ve utilized a number of real-time indexing and retrieval systems, including real-time search, real-time annotating, and also human curation for some of the interests.
To ensure quality, we need to guarantee quality from all sources. For that purpose, we measure the engagement of Pins from each source and address quality issue accordingly.

Saturday, December 12, 2015

Pinterest - Building a scalable/available/Smartt home feed



https://medium.com/@Pinterest_Engineering/sharding-pinterest-how-we-scaled-our-mysql-fleet-3f341e96ca6f
We maintain a configuration table that says which machines these shards are on:
[{“range”:     (0,511), “master”: “MySQL001A”, “slave”: “MySQL001B”},
 {“range”: (512, 1023), “master”: “MySQL002A”, “slave”: “MySQL002B”},
    ...
 {“range”: (3584, 4095), “master”: “MySQL008A”, “slave”: “MySQL008B”}]
This config only changes when we need to move shards around or replace a host. If a master dies, we can promote the slave and then bring up a new slave. The config lives in ZooKeeper and, on update, is sent to services that maintain the MySQL shard.

Since we wanted this data to span multiple databases, we couldn’t use the database’s joins, foreign keys or indexes to gather all data, though they can be used for subqueries that don’t span databases.

All data needed to be replicated to a slave machine for backup, with high availability and dumping to S3 for MapReduce. We only interact with the master in production. You never want to read/write to a slave in production. Slaves lag, which causes strange bugs. Once you’re sharded, there’s generally no advantage to interacting with a slave in production.

We created a 64 bit ID that contains the shard ID, the type of the containing data, and where this data is in the table (local ID). The shard ID is 16 bits, type ID is 10 bits and local ID is 36 bits. The savvy additionology experts out there will notice that only adds to 62 bits. My past in compiler and chip design has taught me that reserve bits are worth their weight in gold. So we have two (set to zero).
ID = (shard ID << 46) | (type ID << 36) | (local ID<<0)
Given this Pin: https://www.pinterest.com/pin/241294492511762325/, let’s decompose the Pin ID 241294492511762325:
Shard ID = (241294492511762325 >> 46) & 0xFFFF = 3429
Type ID  = (241294492511762325 >> 36) & 0x3FF = 1
Local ID = (241294492511762325 >>  0) & 0xFFFFFFFFF = 7075733
So this Pin object lives on shard 3429. It’s type is 1 (i.e. ‘Pin’), and it’s in the row 7075733 in the pins table. For an example, let’s assume this shard is on MySQL012A. We can get to it as follows:
conn = MySQLdb.connect(host=”MySQL012A”)
conn.execute(“SELECT data FROM db03429.pins where local_id=7075733”)

There are two types of data: objects and mappings. Objects contain details, such as Pin data.

Object Tables!

Object tables, such as Pins, users, boards and comments, have an ID (the local ID, an auto-incrementing primary key) and a blob of data that contains a JSON with all the object’s data.
CREATE TABLE pins (
  local_id INT PRIMARY KEY AUTO_INCREMENT,
  data TEXT,
  ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
For example, a Pin object looks like this:
{“details”: “New Star Wars character”, “link”: “http://webpage.com/asdf”, “user_id”: 241294629943640797, “board_id”: 241294561224164665, …}
To create a new Pin, we gather all the data and create a JSON blob. Then, we decide on a shard ID (we prefer to choose the same shard ID as the board it’s inserted into, but that’s not necessary). The type is 1 for Pin. We connect to that database, and insert the JSON into the pins table. MySQL will give back the auto-incremented local ID. Now we have the shard, type and new local ID, so we can compose the full 64 bit ID!

A mapping table links one object to another, such as a board to the Pins on it. The MySQL table for a mapping contains three columns: a 64 bit ‘from’ ID, a 64 bit ‘to’ ID and a sequence ID. There are index keys on the (from, to, sequence) triple, and they live on the shard of the ‘from’ ID.
CREATE TABLE board_has_pins (
  board_id INT,
  pin_id INT,
  sequence INT,
  INDEX(board_id, pin_id, sequence)
) ENGINE=InnoDB;
Mapping tables are unidirectional, such as a board_has_pins table. If you need the opposite direction, you’ll need a separate pin_owned_by_board table. The sequence ID gives an ordering (our ID’s can’t be compared across shards as the new local ID offsets diverge). We usually insert new Pins into a new board with a sequence ID = unix timestamp. The sequence can be any numbers, but a unix timestamp is a convenient way to force new stuff always higher since time monotonically increases. You can look stuff up in the mapping table like this:
SELECT pin_id FROM board_has_pins 
WHERE board_id=241294561224164665 ORDER BY sequence 
LIMIT 50 OFFSET 150
This will give you up to 50 pin_ids, which you can then use to look up Pin objects.
What we’ve just done is an application layer join (board_id -> pin_ids -> pin objects). One awesome property of application layer joins is that you can cache the mapping separate from the object. We keep pin_id -> pin object cache in a memcache cluster, but we keep board_id -> pin_ids in a redis cluster. This allows us to choose the right technology to best match the object being cached.

The next way to add more capacity is to open up new ranges. Initially, we only created 4,096 shards even though our shard ID is 16 bits (64k total shards). New objects could only be created in these first 4k shards. At some point, we decided to create new MySQL servers with shards 4,096 to 8,191 and started filling those.

You lose some nice properties of the ID shard, such as spacial locality. You have to start with all shards made in the beginning and create the key yourself ( it will not make one for you). Always best to represent objects in your system with immutable IDs. That way you don’t have to update lots of references when, for instance, a user changes their username.

consider building a cluster of background processing machines (pro-tip use pyres) to script moving your data from your old databases to your shiny new shard. I guarantee that data will be missed no matter how hard you try (gremlins in the system, I swear), so repeat the data transfer over and over again until the new things being written into the new system are tiny or zero.


https://engineering.pinterest.com/blog/building-scalable-and-available-home-feed
From a Pinner’s point of view, availability means how often they’ll get errors. For service owners, availability means how many minutes the service can be down without violating SLA (service level agreement)
The Pinterest home feed is a personalized collection of Pins for each person. One third of Pinterest’s traffic lands on home feed, which makes it one of our most critical pages. When building our new home feed, achieving four nines or higher was one of the metrics used for measuring the success of the project.

Isolating challenges

The home feed system can be simplified to support three use cases: writing Pinners’ feed to a storage, serving feed from the storage and removing feed when it’s required. 
Writing feed can have a huge QPS (query per second). Fortunately it’s not user-facing and certain delay (e.g. seconds or even minutes) is tolerable. 
Serving has relatively small QPS when comparing the writing operation, but it’s user-facing and has a tight performance requirement.
A simple design can include writing all feed to a storage and serving and deleting from it. At our current scale, we keep hundreds of terabyte data and support millions of operations per second. We’ve had success with HBase in our past iterations of the home feed system. After evaluating all the options, we chose HBase as our backend storage.

The problem with the design is it’s very challenging to tune the same storage to meet the requirements for both a high volume of writing and a high performance of reading and updating. 

For example, when a person creates a new Pin, we’ll fan out the Pin to all his or her followers. Followers are sharded across all HBase regions. When we fan out the same Pin to hundreds of Pinners, the write operation will hit multiple regions, lock the WAL (write ahead log) on each region server, update it and unlock it after use. Locking the WAL for each write/update/delete operation isn’t efficient and quickly becomes a bottleneck. 

A better approach is to batch operations and push the changes to HBase once in a while, which increases the throughput of the HBase cluster dramatically. But the latency of each operation can be as high as the flush interval. For user-facing operations, our latency requirement is at millisecond level and the approach will fail us miserably.
To satisfy the different requirements, we designed a system with two HBase clusters and save data to different HBase clusters at different stages.
  • Zen is a service that provides a graph data model on top of HBase and abstracts the details of HBase operations from data producer and consumer.
  • SmartFeed worker is pushing feed from all sources (we also reference sources as pools) to HBase through Zen, and called by PinLater, an asynchronous job execution system that can tolerate certain delays and failures.
  • HBase for materialized content saves the Pins that have potentially been shown in the home feed before, and its content is accessed through Zen.
  • SmartFeed content generator is in charge of selecting new Pins from the pools, scoring and ordering them.
  • SmartFeed service is indirectly retrieving feed (content) from both of the HBase clusters, and only talks to the pools cluster through SmartFeed content generator.
When a Pinner hits their home feed:
  • SmartFeed service calls content generator to get new Pins
  • Content generator decides how many Pins should be returned and how they should be ordered in the returned result
  • Simultaneously SmartFeed service retrieves saved Pins from HBase for materialized content
  • SmartFeed service will wait for the results from the above two steps, mix and return them. (If the calls to content generator fails or timeouts, the result from step 2 will still be returned.)
  • Offline, SmartFeed service will save the new result to HBase for materialized content and delete them from HBase for pools
With this design, we separate user-facing components from non user-facing components. Since different HBase clusters have different volumes of data and usage patterns, we can scale and configure them individually to meet their needs. In reality, we have far less Pins in materialized content cluster than the cluster for pools. We can make it more reliable and faster without too much cost.
To improve the availability over four nines, we implement something called speculative execution. We always keep a hot standby HBase cluster in a different EC2 availability zone to avoid losing Pinners’ data. Any changes made to the primary HBase cluster will be synced to the standby cluster within a few hundred milliseconds. In the event of a partial failure of the primary cluster we’ll serve the data from a standby cluster. This technique helps make the whole system four nines of read availability (not write) and provides a much better Pinner experience than failing the requests. The way that the speculative execution works is:
  • Make a call to the primary cluster to retrieve data
  • If the call fails or doesn’t return within certain time, make another call to the standby cluster
  • Return the data from the cluster which returns first
With this approach, SmartFeed service will be able to return data if either of the clusters is available and the overall availability is close to the combined availability of the two clusters. The tricky part is to pick a proper waiting time. Since syncing data from the primary cluster to the standby cluster has some delay, the data returned from the standby cluster can be stale. If the waiting time is too small, Pinners will have a higher chance of getting stale data. If the waiting time is too long, Pinners have to wait unnecessarily long even if we can return results from the standby cluster much earlier. For us, we find if a call doesn’t return within time x, it’ll eventually time out in most cases. The time x is also larger than the 99.9 percentile of the call’s latency. We decided to use this as the cutoff time, which means results may be returned from the standby cluster for one out of 1,000 calls.
Another interesting finding is that the latency of the standby cluster is higher than the primary cluster because so few calls fall back to the standby cluster, and it’s in a ‘cold’ state for most of the time. To warm up the pipeline and get it ready for use, we randomly forward x percent of calls to the standby cluster and drop the result.
One time the primary HBase was down for almost one hour because of some hardware issue. Thanks to speculative execution, all home feed requests automatically failover to the standby cluster. The performance and success rate of home feed was not impacted at all during the whole HBase incident.
https://engineering.pinterest.com/blog/building-smarter-home-feed
The home feed should be a reflection of what each user cares about. Content is sourced from inputs such as people and boards the user follows, interests, and recommendations. To ensure we maintain fast, reliable and personalized home feeds, we built the smart feed with the following design values in mind:
1. Different sources of Pins should be mixed together at different rates.
2. Some Pins should be selectively dropped or deferred until a later time. Some sources may produce Pins of poor quality for a user, so instead of showing everything available immediately, we can be selective about what to show and what to hold back for a future session.
3. Pins should be arranged in the order of best-first rather than newest-first. For some sources, newer Pins are intuitively better, while for others, newness is less important.

The architecture behind smart feed

We shifted away from our previously time-ordered home feed system and onto a more flexible one. The core feature of the smart feed architecture is its separation of available, but unseen, content and content that’s already been presented to the user. We leverage knowledge of what the user hasn’t yet seen to our advantage when deciding how the feed evolves over time.

Smart feed is a composition of three independent services, each of which has a specific role in the construction of a home feed.

The role of the smart feed worker

The smart feed worker is the first to process Pins and has two primary responsibilities - to accept incoming Pins and assign some score proportional to their quality or value to the receiving user, and to remember these scored Pins in some storage for later consumption.
Essentially, the worker manages Pins as they become newly available, such as those from the repins of the people the user follows. Pins have varying value to the receiving user, so the worker is tasked with deciding the magnitude of their subjective quality.




Incoming Pins are currently obtained from three separate sources: repins made by followed users, related Pins, and Pins from followed interests. Each is scored by the worker and then inserted into a pool for that particular type of pin. Each pool is a priority queue sorted on score and belongs to a single user. Newly added Pins mix with those added before, allowing the highest quality Pins to be accessible over time at the front of the queue.
Pools can be implemented in a variety of ways so long as the priority queue requirement is met. We choose to do this by exploiting the key-based sorting of HBase. Each key is a combination of user, score and Pin such that, for any user, we may scan a list of available Pins according to their score. Newly added triples will be inserted at their appropriate location to maintain the score order. This combination of user, score, and Pin into a key value can be used to create a priority queue in other storage systems aside from HBase, a property we may use in the future depending on evolving storage requirements.

Smart feed content generator

the smart feed content generator is concerned primarily with defining what “new” means in the context of a home feed. When a user accesses the home feed, we ask the content generator for new Pins since their last visit. The generator decides the quantity, composition, and arrangement of new Pins to return in response to this request.
The content generator assembles available Pins into chunks for consumption by the user as part of their home feed. The generator is free to choose any arrangement based on a variety of input signals, and may elect to use some or all of the Pins available in the pools. Pins that are selected for inclusion in a chunk are thereafter removed from the pools so they cannot be returned as part of subsequent chunks.
The content generator is generally free to perform any rearrangements it likes, but is bound to the priority queue nature of the pools. When the generator asks for n pins from a pool, it’ll get the n highest scoring (i.e., best) Pins available. Therefore, the generator doesn’t need to concern itself with finding the best available content, but instead with how the best available content should be presented.

Smart feed service

In addition to providing high availability of the home feed, the smart feed service is responsible for combining new Pins returned by the content generator with those that previously appeared in the home feed. We can separate these into the chunk returned by the content generator and the materialized feed managed by the smart feed service.
The materialized feed represents a frozen view of the feed as it was the last time the user viewed it. To the materialized Pins we add the Pins from the content generator in the chunk. The service makes no decisions about order, instead it adds the Pins in exactly the order given by the chunk. Because it has a fairly low rate of reading and writing, the materialized feed is likely to suffer from fewer availability events. In addition, feeds can be trimmed to restrict them to a maximum size. The need for less storage means we can easily increase the availability and reliability of the materialized feed through replication and the use of faster storage hardware.
The smart feed service relies on the content generator to provide new Pins. If the generator experiences a degradation in performance, the service can gracefully handle the loss of its availability. In the event the content generator encounters an exception while generating a chunk, or if it simply takes too long to produce one, the smart feed service will return the content contained in the materialized feed. In this instance, the feed will appear to the end user as unchanged from last time. Future feed views will produce chunks as large as, or larger than, the last so that eventually the user will see new Pins.
Continuing with this project, we intend to better model users’ preferences with respect to Pins in their home feeds. Our accuracy of recommendation quality varies considerably over our user base, and we would benefit from using preference information gathered from recent interactions with the home feed. Knowledge of personal preference will also help us order home feeds so the Pins of most value can be discovered with the least amount of effort.
http://timyang.net/architecture/pinterest-feed/
Pinterest首页的Feed消息流,最早是按照用户的关注对象的Pin(类似微博)聚合后按时间进行排序(自然序,类似朋友圈),后来版本的feed系统放弃了自然序,而是根据一定规则及算法来设计,内部称之为Smart feed,其算法及架构根据其公开资料整理如下,值得业界做信息流产品的技术架构师参考。
Pinterest每个用户的首页feed都是个性化内容。Pinterest系统大约1/3流量都指向feed页面,因此它是整个系统最关键的页面之一。当工程师开发新版Smart Feed时,如何达到99.99%可用性也是衡量项目是否成功的指标之一。
Pinterest smart feed的主要算法及规则如下
  • 不同来发表来源的Pin按照不同的频次聚合。
  • 将Pin按照算法及权重有选择的去除(或延迟加载),质量较低的发表来源不必每次显示全部,系统可以有选择的决定哪些立即出现,哪些延迟显示。Pin的质量都是从当前接收用户的角度来衡量。
  • Pin排序的逻辑是最好的优先,而不是最新的优先。一些发表来源的Pin可能最新的优先,但另外一些发表来源的可能新的Pin优先级低。
Pinterest Feed如图所示主要由以下几部分构成,最左边是数据来源,最右边是用户看到的Pinterest瀑布流。中间的三个服务介绍如下。
pinterest-1

Feed worker

Feed worker职责:接收新的pin并根据接收的用户的不同赋予pin权重并保存。同一个Pin,不同的接收用户有不同的权重打分。
新的pin主要有三个来源:关注用户,相关内容,关注关系的感兴趣内容。Worker会给每个来源的pin打分之后插入到一个pool里面,每个Pool是针对单个用户的优先队列(Priority Queue,即优先级高的内容先出)。
由于Feed Worker按照接收用户的维度存储,因此所有的pin进入worker时候已经按照关注关系进行分发(即行内通常说的Feed推模式)。

Feed content generator

Feed content generator负责返回用户上次访问后新的pin。Content Generator可以返回前n条或者全部新的pin,用户获取过(即浏览过)的pin会从pool中删除。Content Generator可以将多个发表源的pin按照一定规则重新排列,但是不能改变原来的Priority Queue返回的优先顺序。即队列中高优先级的会被优先取出。

Smart feed service

物化视图用于保存用户上次feed列表的快照。此服务不需要对feed的重新排序,它将上次返回给用户的pin按照当时的顺序完整保存,由于它属于用户已阅读过的历史列表,读写较少,因此它可以提供更好的可用性。另外由于可以限制历史列表的长度,存储空间可控,因此可以更低成本来增加从库来提高访问的可用性。
Feed依赖content generator来提供新的Pin,如果content generator不可用,服务可以优雅的降级,用户仍然可以获取历史的列表,返回物化存储的结果。
Pinterest通过以上3个不同的服务,实现了对feed返回内容灵活的控制,每个服务都有自己明确的职责,达到了每个用户都具备个性化返回内容的目标。

Feed存储

Pinterest的feed存储需要解决以下几个需求:
  • 写入新发表的feed,由于Pinterest采用的是推模式,这个场景需要面临需要高的写入QPS,但用户能容忍一定的写入延迟。
  • 获取首页的物化feed列表,相对与写入的QPS要小很多,但是用户对请求的延迟容忍度低。
  • 删除feed。
可以采用简单的设计方法,比如将所有的feed写入到一个存储,可以简单实现访问、更新及删除功能。在Pinterest当前的访问规模有上百T的数据以及每秒百万访问操作。经过综合评估,选择使用HBase来实现了上述需求,Pinterest业务场景需要提供非常高的读写及更新操作,HBase同时提供较高的读写及更新访问性能。
用户发表一个新的Pin时,将Pin分发给他所有的粉丝,他的粉丝可能被shard到所有的HBase region上,因此一个分发操作可能要访问到多个region,并锁定每个region的WAL日志,然后进行更新再解锁。每次的write/delete/update操作锁定WAL非常低效,而且很快成为系统的瓶颈。更好的方法是将HBase的操作批量进行,并且可以加大HBase的吞吐能力,但另外一方面增加了访问的时延latency,如果是面向用户请求的操作,访问时延增大是不能接受的。
为了满足不同的需求,Pinterest设计使用了双HBase集群的方法,将数据在不同的阶段写入到不同的HBase集群的方法,请参考图示。
pinterest-3
Zen是一个在HBase基础上提供图(Graph)存储的服务。
SmartFeed Worker将用户发表的内容分发后通过Zen保存在HBase中,异步处理任务通过PinLater服务来调用。
SmartFeed ContentGenerator负责返回最新的Pin,并进行评分及排序。
当用户刷新请求自己首页的feed时,SmartFeed服务从Content Generator和物化存储的HBase归并数据返回给用户,如果生成服务请求超时,则系统仍然可以返回物化存储的数据给用户。在后台,SmartFeed将物化存储的数据从左边的存储删除。
在实际的场景中,物化存储HBase的数据远远要比发表池的数据要少,这样请求的速度会非常快。

Feed的高可用

使用上述设计后,系统的可用性相当于物化存储HBase的可用性。HBase集群目前存在GC卡顿的风险,还有单点故障region迁移等问题,因此使用单一的HBase集群,可用性很难保证99.99%以上。
为了解决这个问题,在另外一个EC2可用区启用一个备用集群,任何写入到主集群的数据将会在数百毫秒内同步到另外一个集群上。当主集群不可用时,可以从备用集群返回用户请求的数据。通过上述设计,整个系统的可用性达到99.99%以上(不包括写)。
pinterest-2

http://pingineering.tumblr.com/post/105293275179/building-a-scalable-and-available-home-feed
Pinnability: Machine learning in the home feed
https://engineering.pinterest.com/blog/pinnability-machine-learning-home-feed


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts