Showing posts with label Cross Data Center. Show all posts
Showing posts with label Cross Data Center. Show all posts

Thursday, March 31, 2016

Solr Cross Data Center Replication



http://lucene.apache.org/solr/news.html
  • New support for Cross Data Center Replication consisting of active/passive replication for separate SolrClouds hosted in separate data centers.
http://www.datastax.com/resources/whitepapers/intro-to-multidc
Cross Data Center Replication
https://issues.apache.org/jira/browse/SOLR-6273
http://yonik.com/solr-cross-data-center-replication/
  • Accommodate 2 or more data centers
  • Accommodate active/active uses
  • Accommodate limited band-with cross-datacenter connections
  • Minimize coupling between peer clusters to increase reliability
  • Support both full consistency and eventual consistency
Clusters will be configured to know about each other, most likely through keeping a cluster peer list in zookeeper. One essential piece of information will be the zookeeper quorum address for each cluster peer. Any node in one cluster can know the configuration of another cluster via a zookeeper client.
Update flow will go from the shard leader in one cluster to the shard leader in the peer clusters. This can be bi-directional, with updates flowing in both directions. Updates can be either synchronous or asynchronous, with per-update granularity.
Solr transaction logs are currently removed when no longer needed. They will be kept around (potentially much longer) to act as the source of data to be sent to peer clusters. Recovery can also be bi-directional with each peer cluster sending the other cluster missed updates.

Architecture Features & Benefits

  • Scalable – no required single points of aggregation / dissemination that could act as a bottleneck.
  • Per-update choice of synchronous/asynchronous forwarding to peer clusters.
  • Peer clusters may have different configuration, such as replication factor.
  • Asynchronous updates allow for bursts of indexing throughput that would otherwise overload cross-DC pipes.
  • “Push” operation for lowest latency async updates.
  • Low-overhead… re-uses Solr’s existing transaction logs for queuing.
  • Leader-to-leader communication means update is only sent over cross-DC connection once.
CDCR

Update Flow

  1. An update will be received by the shard leader and versioned
  2. Update will be sent from the leader to it’s replicas
  3. Concurrently, update will be sent (synchronously or asynchronously) to the shard leader in other clusters
  4. Shard leader in the other cluster will receive already versioned update (and not re-version it), and forward the update to it’s replicas

Solr Document Versioning

The shard leader versions a document and then forwards it to replicas. Update re-orders are handled by the receiver by dropping updates that are detected to be older than the latest document version in the index. This works given that complete documents are always sent to replicas, even if it started as a partial update on the leader.
Solr version numbers are derived from a timestamp (the high bits are milliseconds and the low bits are incremented for each tie in the same millisecond to guarantee a monotonically increasing unique version number for any given leader).

The Clock Skew Problem

If updates are accepted for the same document in two different clouds (implying two different leaders versioning the document), then having the correct last document “win” relies on clock synchronization between the two leaders. Updates to the same document at different data centers within the clock skew time risk being incorrectly ordered.

The Partial Update Problem

Solr only has versions at the document level. The current partial update implementation (because of other constraints) reads the current stored fields of the document, makes the requested update, and indexes the new resulting document. This creates a problem with accepting Solr atomic updates / partial updates to the same document in both data-centers.
Example:
DC1: writes document A, version=time1
DC2: receives document A (version=time1) update from DC1
DC1: updates A.street_address (Solr reads version time1, writes version time2)
DC2: updates A.phone_number (Solr reads version time1, writes version time3)
DC1: receives document A (version=time3) from DC2, writes it.
DC2: received document A (version=time2) from DC1, ignores it (older version)
Although both data-centers became “consistent”, the partial update of street_address was completely lost in the process.

Solutions

Option 1:
Configure the update for full synchronization. All peer clusters must be available for any to be writeable.
Option 2:
Use client versioning, where the update clients specify a user-level version field.
Option 3:
For a given document, consider one cluster the primary for the purposes of document changes/updates. See “Primary Cluster Routing”.

Primary Cluster Routing

To deal with potential update conflicts arising from updating the same document in different data centers, each document can have a primary cluster.
A routing enhancement can ensure that a document sent to the wrong cluster will be forwarded to the correct cluster.
Routing can take as input a request parameter, a document field, or the unique id field. The primary cluster could be determined by hash code (essentially random), or could be determined by a mapping specified in the cluster peer list. Changes to this mapping for fail-over would not happen automatically in Solr. If a data center becomes unreachable, the application/client layers have responsibility for deciding that a different cluster should become the primary for that set of documents.
Primary cluster routing will be optional. Many applications will naturally not trigger the type of undesirable update behavior described, or will have the ability to work around update limitations.

Future Option: Improve Partial Updates

Implement true partial updates with vector clocks and/or finer grained versioning so that updates to different fields can be done conflict free if re-ordered. This would also lower the bandwidth costs of partial updates since the entire document would no longer be sent to all replicas and to other peer clusters.

Future Option: Update Aggregators

One could potentially further minimize cross-DC traffic by introducing traffic aggregator nodes (one per cluster) that all udpates would flow through. This would likely only improve bandwidth utilization in low update environments. The improvements would come from fewer connections (and hence less connection overhead) and better compression (a block of many small updates would generally have a better compression ratio than the same updates compressed individually).

Future Option: Clusterstate proxy

Many zookeeper clients in a peer cluster could generate significant amounts of traffic between data centers. There could be a designated listener to the remote cluster state that could disseminate this state to others in the local cluster rather than hitting ZK directly.


Also worth investigating is the use of a local zookeeper observer node that could service all local ZK reads for the remote ZK quorum.


Sunday, December 13, 2015

Instagration Pt. 2: Scaling our infrastructure to multiple data centers



http://engineering.instagram.com/posts/548723638608102/instagration-pt-2-scaling-our-infrastructure-to-multiple-data-centers/

The key to expanding to multiple data centers is to distinguish global data and local data. Global data needs to be replicated across data centers, while local data can be different for each region (for example, the async jobs created by web server would only be viewed in that region).
The next consideration is hardware resources. These can be roughly divided into three types: storage, computing and caching.
Storage
Instagram mainly uses two backend database systems: PostgreSQL and Cassandra. Both PostgreSQL and Cassandra have mature replication frameworks that work well as a globally consistent data store.
Global data neatly maps to data stored in these servers. The goal is to have eventual consistency of these data across data centers, but with potential delay. Because there are vastly more read than write operations, having read replica each region avoids cross data center reads from web servers.
Writing to PostgreSQL, however, still goes across data centers because they always write to the primary.
CPU Processing
Web servers, async servers are both easily distributed computing resources that are stateless, and only need to access data locally. Web servers can create async jobs that are queued by async message brokers, and then consumed by async servers, all in the same region.

Caching

The cache layer is the web servers' most frequently accessed tier, and they need to be collocated within a data center to avoid user request latency. This means that updates to cache in one data center are not reflected in another data center, therefore creating a challenge for moving to multiple data centers.
Imagine a user commented on your newly posted photo. In the one data center case, the web server that served the request can just update the cache with the new comment. A follower will see the new comment from the same cache.
In the multi data center scenario, however, if the commenter and the follower are served in different regions, the follower’s regional cache will not be updated and the user will not see the comment.
Our solution is to use PgQ and enhance it to insert cache invalidation events to the databases that are being modified.
On the primary side:
  • Web server inserts a comment to PostgreSQL DB;
  • Web server inserts a cache invalidation entry to the same DB.
On the replica side:
  • Replicate primary DB, including both the newly inserted comment as well as the cache invalidation entry
  • Cache invalidation process reads the cache invalidation entry and invalidates regional cache
  • Djangos will read from DB with the newly inserted comment and refill the cache
This solves the cache consistency issue. On the other hand, compared to the one-region case where django servers directly update cache without re-reading from DB, this would create increased read load on databases. In order to mitigate this problem, we took two approaches: 1) reduce computational resources needed for each read by denormalizing counters; 2) reduce number of reads by using cache leases.

De-normalizing Counters

The most commonly cached keys are counters. For example, we would use a counter to determine the number of people who liked a specific post from Justin Bieber. When there was just one region, we would update the memcache counters by incrementing from web servers, therefore avoiding a “select count(*)” call to the database, which would take hundreds of milliseconds.
But with two regions and PgQ invalidation, each new like creates a cache invalidation event to the counter. This will create a lot of “select count(*)”, especially on hot objects.
To reduce the resources needed for each of these operations, we denormalized the counter for likes on the post. Whenever a new like comes in, the count is increased in the database. Therefore, each read of the count will just be a simple “select” which is a lot more efficient.
There is also an added benefit of denormalizing counters in the same database where the liker to the post is stored. Both updates can be included in one transaction, making the updates atomic and consistent all the time. Whereas before the change, the counter in cache could be inconsistent with what was stored in the database due to timeout, retries etc.
PostgreSQL Read Replicas Can't Catch up
As a Postgres primary takes in writes, it generates delta logs. The faster the writes come in, the more frequent these logs are generated. The primaries themselves store the most recent log files for occasional needs from the replicas, but they archive all the logs to storage to make sure that they are saved and accessible by any replicas that need older data than what the primary has retained. This way, the primary does not run out of disk space.
When we build a new read replica, the read replica starts to read a snapshot of the database from the primary. Once it’s done, it needs to apply the logs that have happened since the snapshot to the database. When all the logs are applied, it will be up-to-date and can stream from the primary and serve reads from web servers.
However, when a large database's write rate is quite high, and there is a lot of network latency between the replica and storage device, it is possible that the rate at which the logs are read is slower than the log creation rate. The replica will fall further and further behind and never catch up!
Our fix was to start a second streamer on the new readreplica as soon as it starts to transfer the base snapshot from the primary. It streams logs and stores it on local disk. When snapshot finishes transfer, the readreplica can read the logs locally, making it a much faster recovery process.
This not only solved our database replication issues across the US, but also cut down the time it took to build a new replica by half. Now, even if the primary and replica are in the same region, operational efficiency is drastically increased.
http://dockone.io/article/841

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts