https://engineering.pinterest.com/blog/introducing-pinterest-secor
Pinterest logging pipeline
Pinterest logging pipeline
Our data logging center of gravity is a Kafka cluster. Kafka introduces abstractions that simplify collecting logs, but is only capable of streaming data to local disks and therefore isn’t suitable as a long-term data store.
Logs are stored on Amazon S3, and while it’s a highly reliable and scalable storage solution, S3 comes with the possibility of eventual consistency, meaning there are no guarantees for when uploaded files will become visible to readers. S3 also has non-monotonic properties that may cause files to “disappear” and reappear moments later.
No-reads principle
Secor works around the limitations of the eventual consistency model by adhering to a principle that it never reads back anything it wrote to S3. It relies on Kafka consumer offset management protocol to keep track of what’s been uploaded to S3. Kafka stores the underlying metadata in ZooKeeper, while metadata commit points are controlled by Secor and they occur with a very low frequency of roughly one update per Kafka partition per hour.
The fact that metadata is stored separately from the data introduces a potential complication of keeping two stores in sync. Secor addresses this issue by enforcing that data is updated before the metadata and by using deterministic S3 paths. Any inconsistency caused by a successful update of the data followed by a failed commit to the metadata store will auto-resolve itself during subsequent state updates.
https://engineering.pinterest.com/blog/pinnability-machine-learning-home-feed
http://www.infoq.com/cn/news/2015/12/Pinterest-Web-URL
https://engineering.pinterest.com/blog/fetching-and-serving-billions-urls-aragog
http://www.infoq.com/cn/news/2015/12/Pinterest-Web-URL
https://engineering.pinterest.com/blog/fetching-and-serving-billions-urls-aragog
Aragog architecture
There are several important considerations that must be realized when building infrastructure that deals with billions of URLs:
- Normalization/canonicalization: The same URL can be represented in many different forms, and several URLs may eventually redirect to the same URL. URL normalization (deduplication of different URL representations) and canonicalization (deduplication of URLs pointing to the same page) play a significant role in reducing the amount of data storage required for serving.
- Crawl politeness: At this scale, it’s important to rate limit and smooth out the traffic going out to each particular domain. Furthermore, robots.txt rules need to be respected appropriately.
- Modeling URL data: One may want to store pieces of extracted metadata associated with a URL or store and update the inlinks and outlinks associated with a URL.
Aragog is composed of two services: the Aragog Fetcher, which fetches the web pages, respecting appropriate rate limits and canonicalizing URLs appropriately, and the Aragog UrlStore, which stores and serves all of the processed metadata and signals about the URLs. The figure below depicts some of many interactions between our crawl pipelines/frontend and Aragog.
Aragog Fetcher
The Aragog Fetcher is a Thrift service responsible for fetching the URLs politely. Aragog Fetcher issues the HTTP requests, follows redirects and retrieves the page content and the HTTP headers. The fetcher returns a Thrift struct enclosing the page content, HTTP headers, fetch latency, the redirect chain and other data.
Implementing crawl politeness requires two things:
- Respecting the rules in robots.txt
- Smoothing and rate limiting traffic to a particular domain
The Aragog Fetcher retrieves the robots.txt file on a particular domain, caching its contents for seven days. When a request is made to fetch a URL, it applies the fetch/don’t-fetch rules from robots.txt. If the robots.txt allows fetching, it calls out to the rate limiter with the URL’s domain.
The rate limiter may allow the request immediately, insist the fetcher to delay the request for a period of milliseconds to smooth out the URL fetching or force it to fail because the rate has been exceeded. To ensure Aragog Fetcher doesn’t overburden a domain with too many requests, the rate limiters allow up to 10 QPS to a single domain. We override this limit for some popular or more frequently crawled domains as necessary. The overrides are propagated as a configuration file to the pool of rate limiters using our config management system.
The rate limiter is served by a pool of machines sharded using consistent hashing by the URL domain. As a result, a single machine is responsible for making rate limiting decisions on a single domain. It also minimizes the amount of rate limiting state moving around when a rate limiter process/machine is added or decommissioned. Each rate limiter machine stores a mapping from the domain to the timestamp when a fetch was last scheduled. The rate limiter retrieves this timestamp (let’s call itlastScheduledFetchTimeMs) and schedules the next fetch accordingly. For example if the allow QPS is 10, the rate limiter will schedule a fetch for this URL at lastScheduledFetchTimeMs + 100 (since we want to space out the requests at 100ms). The rate limiter uses a CAS update to optimistically update the last scheduled fetch time for the URL and retries if the CAS operation fails. It calculates the delay by subtracting the current time from lastScheduledFetchTimeMs. When there’s a large burst of requests, the delay will be large (more than one second). When this happens, the rate limiter throws an exception back to the fetcher. Storing an 8 byte timestamp makes for very little overhead per domain.
Whenever a URL is rate limited, the client simply reschedules the fetch to a later time, a feature intrinsically supported by our asynchronous task execution system called PinLater.
Aragog UrlStore
Every time you see a Rich Pin, you’re looking at data served from the Aragog UrlStore. The UrlStore is the storage and serving system that holds the metadata extracted from pages fetched. It holds the page content itself, semi-structured data extracted from the page and web graph metadata such as inlinks and outlinks. We created this shared system so that product teams can rapidly build functionality that uses this metadata without the burden of building their own scalable serving infrastructure.
There were a couple of key design considerations we made when designing the system. First, we wanted a one-stop shop for all URL metadata across the organization. Second, we wanted to serve Pinterest’s full online read traffic from our API tier at acceptably low latencies, as well as read-write traffic from our offline processing systems which are a combination of batch and real-time processing.
To accomplish this, we built a federated storage system that provides a comprehensive data model while storing metadata efficiently in systems that have an appropriate size, latency, durability and consistency.
Here are a few examples of how we made the tradeoff between latency, durability and consistency.
Page content
We store the full content of web pages fetched. These are large blobs of data that get retrieved infrequently and only for offline pipelines for processing. We choose to store this data in S3 because affordable large storage size was more important than low-latency.
Each web page is stored as a separate S3 file. We use a hash of the URL (normalized and canonical) as the key, but we found that S3 is susceptible to key hot spots. When you create many keys with long common prefixes, you can overload individual servers within the S3 cluster, degrading the performance for some of the keys in your bucket (using the URLs as keys will create these hotspots). We initially tried to use the URL with reverse domain (imagine a million keys in a single S3 bucket that all begin “com.etsy/...”) but ended up receiving hotspotting complaints from Amazon.
In order to help Pinners find what they’re searching for in the most effective ways, we must understand their intentions behind search queries.
Data around the query
The more people search, the better we can suggest results. From the previous example, we can guess that the next person who issues the query “turkey” may also be interested in the “turkey recipes.” The information extracted from previous query log has shown to be effective in understanding the user’s search intent.
Search context such as adjacent queries in the same search session and clicked Pins after submitting a search query can help us improve the discovery experience for future searches.
To capture the information about a search query and make it available for other applications to process, derive signals and build features on top of it, we designed a data collection called QueryJoin that contains the following data:
- The search query, which is also the identifier for the QueryJoin.
- Demographic stats such as gender, country or language.
- Adjacent queries, which we store queries that appeared in the same session to learn how users refined their search queries to find things they were looking for.
- Pins, as we store a set of Pins returned for the search query. For each Pin, we have aggregated data from the PinJoin (the data collection of a cluster of Pins with the same image signature and the information about those Pins) as well as some engagement stats like the number of clicks, repins and likes.