Related: http://massivetechinterview.blogspot.com/2016/02/write-ahead-logging.html
Designing Data-Intensive Applications
Designing Data-Intensive Applications
SHARED-NOTHING ARCHITECTURES
REPLICATION VS. PARTITIONING
Replication
Leaders and Followers
SYNCHRONOUS VS. ASYNCHRONOUS REPLICATION
if you enable synchronous replication on a database, it usually means that one of the followers is synchronous, and the others are asynchronous. If the synchronous follower becomes unavailable or goes slow, one of the asynchronous followers is made synchronous. This guarantees that you have an up-to-date copy of the data on at least two nodes: the leader and one synchronous follower. This configuration is sometimes also called semi-synchronous.
Often, leader-based replication is configured to be completely asynchronous.
SETTING UP NEW FOLLOWERS
Take a consistent snapshot of the leader’s database at some point in time—if possible, without taking a lock on the entire database. Most databases have this feature, as it is also required for backups
Copy the snapshot to the new follower node.
The follower connects to the leader, and requests all data changes that happened since the snapshot was taken. This requires that the snapshot is associated with an exact position in the leader’s change stream. That position has various different names: for example, PostgreSQL calls it log sequence number, and MySQL calls it binlog coordinates.
When the follower has processed the backlog of data changes since the snapshot, we say it has caught up. It can now continue to process data changes from the leader as they happen.
HANDLING NODE OUTAGES
Follower failure: catch-up recovery
from its log, it knows the last transaction that was processed before the fault occurred. Thus the follower can connect to the leader, and request all data changes that occurred during the time when the follower was disconnected.
Leader failure: failover
one of the followers needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other followers need to start consuming data changes from the new leader.
Discarding writes is especially dangerous if other storage systems outside of the database need to be coordinated with the database contents.
split brain
IMPLEMENTATION OF REPLICATION STREAMS
Statement-based replication
the leader logs every write request (statement) that it executes, and sends that statement log to its followers.
Any statement that calls a non-deterministic function, for example NOW() to get the current date and time, or RAND() to get a random number, is likely to generate a different value on each replica.
If statements use an auto-incrementing column, or if they depend on the existing data in the database (e.g. UPDATE ... WHERE <some condition>), they must be executed in exactly the same order on each replica, otherwise they may have a different effect. This can be limiting when there are multiple concurrently executing transactions.
Statements that have side-effects (e.g. triggers, stored procedures, user-defined functions) may result in different side-effects occurring on each replica, unless the side-effects are absolutely deterministic.
It is possible to work around those issues—for example, the leader can replace any non-deterministic function calls with a fixed return value when the statement is logged, so that the followers all get the same value. However, because there are so many edge cases, other replication methods are now generally preferred.
Write-ahead log (WAL) shipping
In the case of a log-structured storage engine (SSTables and LSM-trees), this log is the main place for storage. Log segments are compacted and garbage-collected in the background.
In the case of a B-tree (B-trees), which overwrites individual disk blocks, every modification is first written to a write-ahead log (WAL) so that the index can be restored to a consistent state after a crash.
The main disadvantage is that the log describes the data on a very low level: a WAL contains details of which bytes were changed in which disk block. This makes replication closely coupled to the storage engine. If the database changes its storage format from one version to another, it is typically not possible to run different versions of the database software on the leader and the followers.
Logical log replication
use different log formats for replication and for the storage engine. This allows the replication log to be decoupled from the storage engine internals. This is sometimes called a logical log.
A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:
For an inserted row, the log contains the new values of all columns.
For a deleted row, the log contains enough information to uniquely identify the row that was deleted. Typically this would be the primary key, but if there is no primary key on the table, the old values of all columns need to be logged.
For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns (or at least the new values of all columns that changed). - MySQL’s binlog
Since a logical log is decoupled from the storage engine internals, it can more easily be kept backwards-compatible, allowing the leader and the follower to run different versions of the database software, or even different storage engines.
A logical log format is also easier for external applications to parse.
Trigger-based replication
A trigger lets you register custom application code in a database system so that it is automatically executed when a data change (write transaction) occurs. The trigger has the opportunity to log this change into a separate table, from where it can be read by an external process. That external process can then apply any necessary application logic, and replicate the data change to another system.
replication lag
READING YOUR OWN WRITES
- read-after-write consistency
When reading something that the user may have modified, read it from the leader, otherwise read it from a follower. This requires that you have some way of knowing whether something might have been modified, without actually querying it. For example, user profile information on a social network is normally only editable by the only owner of the profile, not by anybody else. Thus, a simple rule is: always read the user’s own profile from the leader, and any other users’ profiles from a follower.
cross-device read-after-write consistency
Approaches which require remembering a timestamp of the user’s last update become more difficult, because the code running on one device doesn’t know what updates have happened on the other device. This metadata would need to be centralized.
MONOTONIC READS
monotonic reads only means that if one user makes several reads in sequence, they will not see time go backwards
One way of achieving monotonic reads is to make sure that each user always makes their reads from the same replica (different users can read from different replicas).
CONSISTENT PREFIX READS
if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order.
in many distributed databases, different partitions operate independently, so there is no global ordering of writes: when a user reads from the database, they may see some parts of the database in an older state, and some in a newer state.
One solution is to make sure that any writes which are causally related to each other are written to the same partition—but in some applications that can’t be done efficiently. In general, ensuring consistent prefix reads requires a kind of distributed transaction with a guarantee such as snapshot isolation.
Multi-leader replication/master-master replication or active/active
each leader simultaneously acts as a follower to the other leaders.
Multi-datacenter operation
you can have a leader in each datacenter.
between datacenters, each datacenter’s leader replicates its changes to the leaders in other datacenters.
In a multi-leader configuration, every write can be processed in the local datacenter, and is replicated asynchronously to the other datacenters.
Tolerance of datacenter outages
In a single-leader configuration, if the datacenter with the leader fails, failover can promote a follower in another datacenter to be leader. In a multi-leader configuration, each datacenter can continue operating independently of the others, and replication catches up when the failed datacenter comes back online.
Tolerance of network problems
Downside: the same data may be concurrently modified in two different datacenters, and those write conflicts must be resolved
As multi-leader replication is a somewhat retrofitted feature in many databases, there are often subtle configuration pitfalls and surprising interactions with other database features. For example, auto-incrementing keys, triggers and integrity constraints can be problematic. For this reason, multi-leader replication is often considered dangerous territory that should be avoided if possible
Clients with offline operation
HANDLING WRITE CONFLICTS
Converging towards a consistent state
Give each write a unique ID (e.g. a timestamp, a long random number, a UUID, or a hash of the key and value), sort conflicting writes by this ID, and give precedence to the write with the highest ID. The write with the highest ID is called the winner. If a timestamp is used, this technique is known as last-write-wins. Note however that clocks cannot guarantee to be exactly in sync, so the definition of which write is more “recent” may be approximate.
Record the conflict in an explicit data structure that preserves all information, and write application code which resolves the conflict at some later time (perhaps by prompting the user).
On write
As soon as the database system detects a conflict in the stream of replicated changes, it calls the conflict handler.
On read
When a conflict is detected, all the conflicting writes are stored. The next time the data is read, these multiple versions of the data are returned to the application. The application may prompt the user or automatically resolve the conflict, and write the result back to the database. CouchDB works this way, for example.
Conflict-free replicated data types (CRDTs) [235] are a family of data structures for sets, maps, ordered lists, counters etc. which can be concurrently edited by multiple users, and which automatically resolve conflicts in sensible ways. Some CRDTs have been implemented in Riak 2.0
Operational transformation
Infinite replication loops
each replica is given a unique identifier, and in the replication stream, each write is tagged with the identifiers of all the replicas it has passed through. When a node receives a data change that is tagged with its own identifier, that data change is ignored, because the node knows that it has already been processed
Client-driven replication
Some data storage systems take a different approach, by making the client responsible for copying data to multiple replicas (or a coordinator node acting on behalf of the client). In this approach, the nodes do not actively copy data among each other: if a client writes some data to just one node, it will be stored only on that one node. If the client wants the data to be replicated on multiple nodes, it must connect to each node and separately write the data to each node.
Riak, Cassandra and Voldemort are open source datastores with client-driven replication models inspired by Dynamo.
WRITING TO THE DATABASE WHEN A NODE IS DOWN
read requests are also sent to several nodes in parallel. A version number is used to determine which value is newer.
Read repair and anti-entropy
After an unavailable node comes back online, how does it catch up on the writes that it missed?
Read repair
Anti-entropy process
In addition, some datastores have a background process which constantly looks for differences in the data between replicas, and copies any missing data from one replica to another. Unlike the data change stream in leader-based replication, this anti-entropy process does not copy writes in any particular order, and there may be a significant delay before data is copied.
without an anti-entropy process, values which are rarely read may have reduced durability, because read repair is required to ensure that the value is copied to all replicas.
Quorums for reading and writing
if there are n replicas, every write must be confirmed by w nodes to be considered successful, and we must query at least r nodes for each read. (In our example, n = 3, w = 2, r = 2.) As long as w + r > n, we expect to get an up-to-date value when reading, because at least one of the r nodes we’re reading from must be up-to-date. Reads and writes that obey these r and w values are called quorum reads and writes.
A common choice is to make n an odd number (typically 3 or 5), and to set w = r = (n + 1) / 2 (rounded up). However, you can vary the numbers as you see fit. For example, a workload with few writes and many reads may benefit from setting w = n and r = 1. This makes reads faster and writes slower, but has the disadvantage that just one failed node makes the database unavailable for writes.
With n = 3, w = 2, r = 2 we can tolerate one unavailable node.
With n = 5, w = 3, r = 3 we can tolerate two unavailable nodes.
Normally, reads and writes are always sent to all n replicas in parallel. The parameters w and r determine how many of the n nodes need to report success before we consider the read or write to be successful.
LIMITATIONS OF QUORUM CONSISTENCY
because the set of nodes to which you’ve written and the set of nodes from which you’ve read must overlap, i.e. there has to be at least one node with the latest value
If a write is happening concurrently with a read, the write may be reflected on only some of the replicas. In this case, it’s undetermined whether the read returns the old or the new value.
If a write succeeded on some replicas but failed on others, and overall succeeded on fewer than w replicas, it is not rolled back on the replicas where it succeeded. This means that if a write was reported as failed, subsequent reads may or may not return the value from that write
If a node carrying a new value fails, and its data is restored from a replica carrying an old value, the number of replicas storing the new value may fall below w, breaking the quorum condition.
If a sloppy quorum is used (Sloppy quorums and hinted handoff), the w writes may end up on different nodes than the r reads, so there is no longer a guaranteed overlap between the r nodes and the w nodes.
Monitoring staleness
replication lag: because writes are applied to the leader and to followers in the same order, and each node has a position in the replication stream (the number of writes it has applied locally). By subtracting a follower’s current position from the leader’s current position you can measure the amount of replication lag.
SLOPPY QUORUMS AND HINTED HANDOFF
Sloppy quorum: writes and reads still require w and r successful responses, but those may include nodes that are not amongst the designated n “home” nodes for a value.
Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate “home” nodes. This is called hinted handoff.
Sloppy quorums are particularly useful for increasing write availability: as long as any w nodes are available, the database can accept writes. However, this means that even when w + r > n, you cannot be sure to read the latest value for a key, because the latest value may have been temporarily written to some nodes outside of n.
Thus, a sloppy quorum actually isn’t a quorum at all in the traditional sense. It’s only an assurance of durability, namely that the data is stored on w nodes somewhere. There is no guarantee that a read of r nodes will see it until the hinted handoff has completed.
Reconciling conflicts
In Riak and Voldemort, conflict resolution happens at read time. When an application reads a key, the database returns all conflicting values, so that the application can merge them appropriately. The merged value can then be written back to the database, similarly to read repair.
By contrast, Cassandra normally uses timestamps to pick a winner if there are several conflicting updates, and discards conflicting values with older timestamps. Use of timestamps relies on clock synchronization, which is not reliable (see Relying on synchronized clocks). However, Cassandra has a richer data model, which allows applications to avoid concurrent writes to the same key.
Multi-datacenter operation
Cassandra and Voldemort implement their multi-datacenter support within the normal client-driven model:
the number of replicas n includes nodes in all datacenters, and in the configuration you can specify how many of the n replicas you want to have in each datacenter. Each write from a client is sent to all replicas, regardless of datacenter, but the client usually only waits for acknowledgement from a quorum of nodes within its local datacenter, so that it is unaffected by latency and interruptions of the cross-datacenter link. The higher-latency writes to other datacenters are often configured to happen asynchronously, although there is some flexibility in the configuration
the number of replicas n includes nodes in all datacenters, and in the configuration you can specify how many of the n replicas you want to have in each datacenter. Each write from a client is sent to all replicas, regardless of datacenter, but the client usually only waits for acknowledgement from a quorum of nodes within its local datacenter, so that it is unaffected by latency and interruptions of the cross-datacenter link. The higher-latency writes to other datacenters are often configured to happen asynchronously, although there is some flexibility in the configuration
Riak keeps all communication between clients and database nodes local to one datacenter, so n describes the number of replicas within one datacenter. Cross-datacenter replication between database clusters happens asynchronously in the background, in a style that is similar to multi-leader replication.
Client-driven replication: Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.
time-of-day clocks(NTP)
clock_gettime(CLOCK_REALTIME) on Linux41 and System.currentTimeMillis() in Java
if the local clock is too far ahead of the NTP server, it may be forcibly reset and appear to jump back to a previous point in time time. These jumps, as well as the fact that they often ignore leap seconds, make time-of-day clocks unsuitable for measuring elapsed time.
MONOTONIC CLOCKS - suitable for measuring a duration (time interval)
clock_gettime(CLOCK_MONOTONIC) on Linux and System.nanoTime()
they are guaranteed to always move forwards (whereas a time-of-day clock may jump back in time).
the absolute value of the clock is meaningless
NTP may adjust the frequency at which the monotonic clock moves forward (this is known as slewing the clock) if it detects that the computer’s local quartz is moving faster or slower than the NTP server. By default, NTP allows the clock rate to be speeded up or slowed down by up to 0.05%, but NTP cannot cause the monotonic clock to jump forwards or backwards.
The quartz clock in a computer is not very accurate: it drifts (runs faster or slower than it should). Clock drift varies depending on the temperature of the machine. Google assumes a clock drift of 200 ppm (parts per million) for their servers [44], which is equivalent to 6 ms drift for a clock that is resynchronized with a server every 30 seconds, or 17 seconds drift for a clock that is resynchronized once a day.
If a computer’s clock differs too much from an NTP server, it may refuse to synchronize, or the local clock will be forcibly reset [40]. Any applications observing the time before and after this reset may see time go backwards or suddenly jump forwards.
NTP synchronization can only be as good as the network delay, so there is a limit to its accuracy when you’re on a congested network with variable packet delays. One experiment showed that a minimum error of 35 ms is achievable when synchronizing over the internet [45], though occasional spikes in network delay lead to errors of around a second. Depending on the configuration, large network delays can cause the NTP client to give up entirely.
TIMESTAMPS FOR ORDERING EVENTS
- used by cassandra
- not accurate
logical clocks - relative ordering of events
CLOCK READINGS HAVE A CONFIDENCE INTERVAL
snapshot isolation
It allows read-only transactions to see the database in a consistent state at a particular point in time, without locking and interfering with read-write transactions.
The most common implementation of snapshot isolation requires a monotonically increasing transaction ID. If a write happened later than the snapshot (i.e. the write has a greater transaction ID than the snapshot), that write is invisible to the snapshot transaction.
Spanner implements distributed snapshot isolation across datacenters in this way. It uses the clock’s confidence interval as reported by the TrueTime API, and is based on the following observation: if you have two confidence intervals (A = [Aearliest, Alatest] and B = [Bearliest, Blatest]).
Spanner deliberately waits for the length of the confidence interval before committing a read-write transaction. By doing so, it ensures that any transaction that may read the data is at a sufficiently later time, so their confidence intervals do not overlap. In order to keep the wait time as short as possible, Spanner needs to keep the clock uncertainty as small as possible; for this purpose, Google deploys a GPS receiver or atomic clock in each datacenter, allowing clocks to be synchronized to within about 7 ms.
Process pauses
lease - a lock with a timeout
LAST WRITE WINS (DISCARDING CONCURRENT WRITES)
Dpn't use LWW if losing data is not acceptale
Use uuid as key, make each write unique
THE “HAPPENS-BEFORE” RELATIONSHIP AND CONCURRENCY
An operation A happens before another operation B if B knows about A, or depends on A, or builds upon A in some way. Whether one operation happens before another operation is the key to defining what concurrency means. In fact, we can simply say that two operations are concurrent if neither happens before the other, i.e. neither knows about the other
TRACKING HAPPENS-BEFORE RELATIONSHIPS
The server maintains a version number for every key, increments the version number every time that key is written, and stores the new version number along with the value written.
When a client reads a key, the server returns all values that have not been overwritten, as well as the latest version number. A client must read a key before writing.
When a client writes a key, it must include the version number from the prior read, and it must merge together all values that it received in the prior read. (The response from a write request can be like a read, returning all current values, which allows us to chain several writes like in the example above.)
When the server receives a write with a particular version number, it can overwrite all values with that version number or below (since it knows that they have been merged into the new value), but it must keep all values with a higher version number (because those values are concurrent with the incoming write).
When a write includes the version number from a prior read, that tells us which previous state the write is based on. If you make a write without including a version number, it is concurrent to all other writes, so it will not overwrite anything — it will just be returned as one of the versions on subsequent reads.
MERGING CONCURRENTLY WRITTEN VALUES - siblings
To prevent this, an item cannot simply be deleted from the database when it is removed; instead, the system must leave a marker with an appropriate version number to indicate that the item has been removed when merging siblings. Such a deletion marker is known as a tombstone.
VERSION VECTORS
we need to use a version number per replica as well as per key. Each replica increments its own version number when processing a write, and also keeps track of the version numbers it has seen from all of the other replicas. It can then use that information to figure out which values to overwrite and which values to keep as siblings.
The collection of version numbers from all the replicas is called a version vector
riak - dotted version vector
The version vector structure ensures that it is safe to read from one replica and subsequently write back to another replica: this may result in siblings being created, but no data is lost as long as siblings are merged correctly.
VERSION VECTORS AND VECTOR CLOCKS
A version vector is sometimes also called a vector clock. The difference between these data structures is subtle — one way of looking at it is that version vectors are for client-server systems, and vector clocks are for peer-to-peer systems.
Leaderless replication: Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.
Read-after-write consistency: a user should always see data that they submitted themselves.
Monotonic reads: after a user has seen the data at one point in time, they shouldn’t later see the data from some earlier point in time.
Consistent prefix reads: users should see the data in a state that makes causal sense
time-of-day clocks(NTP)
clock_gettime(CLOCK_REALTIME) on Linux41 and System.currentTimeMillis() in Java
if the local clock is too far ahead of the NTP server, it may be forcibly reset and appear to jump back to a previous point in time time. These jumps, as well as the fact that they often ignore leap seconds, make time-of-day clocks unsuitable for measuring elapsed time.
MONOTONIC CLOCKS - suitable for measuring a duration (time interval)
clock_gettime(CLOCK_MONOTONIC) on Linux and System.nanoTime()
they are guaranteed to always move forwards (whereas a time-of-day clock may jump back in time).
the absolute value of the clock is meaningless
NTP may adjust the frequency at which the monotonic clock moves forward (this is known as slewing the clock) if it detects that the computer’s local quartz is moving faster or slower than the NTP server. By default, NTP allows the clock rate to be speeded up or slowed down by up to 0.05%, but NTP cannot cause the monotonic clock to jump forwards or backwards.
The quartz clock in a computer is not very accurate: it drifts (runs faster or slower than it should). Clock drift varies depending on the temperature of the machine. Google assumes a clock drift of 200 ppm (parts per million) for their servers [44], which is equivalent to 6 ms drift for a clock that is resynchronized with a server every 30 seconds, or 17 seconds drift for a clock that is resynchronized once a day.
If a computer’s clock differs too much from an NTP server, it may refuse to synchronize, or the local clock will be forcibly reset [40]. Any applications observing the time before and after this reset may see time go backwards or suddenly jump forwards.
NTP synchronization can only be as good as the network delay, so there is a limit to its accuracy when you’re on a congested network with variable packet delays. One experiment showed that a minimum error of 35 ms is achievable when synchronizing over the internet [45], though occasional spikes in network delay lead to errors of around a second. Depending on the configuration, large network delays can cause the NTP client to give up entirely.
TIMESTAMPS FOR ORDERING EVENTS
- used by cassandra
- not accurate
logical clocks - relative ordering of events
CLOCK READINGS HAVE A CONFIDENCE INTERVAL
snapshot isolation
It allows read-only transactions to see the database in a consistent state at a particular point in time, without locking and interfering with read-write transactions.
The most common implementation of snapshot isolation requires a monotonically increasing transaction ID. If a write happened later than the snapshot (i.e. the write has a greater transaction ID than the snapshot), that write is invisible to the snapshot transaction.
Spanner implements distributed snapshot isolation across datacenters in this way. It uses the clock’s confidence interval as reported by the TrueTime API, and is based on the following observation: if you have two confidence intervals (A = [Aearliest, Alatest] and B = [Bearliest, Blatest]).
Spanner deliberately waits for the length of the confidence interval before committing a read-write transaction. By doing so, it ensures that any transaction that may read the data is at a sufficiently later time, so their confidence intervals do not overlap. In order to keep the wait time as short as possible, Spanner needs to keep the clock uncertainty as small as possible; for this purpose, Google deploys a GPS receiver or atomic clock in each datacenter, allowing clocks to be synchronized to within about 7 ms.
Process pauses
lease - a lock with a timeout
LAST WRITE WINS (DISCARDING CONCURRENT WRITES)
Dpn't use LWW if losing data is not acceptale
Use uuid as key, make each write unique
THE “HAPPENS-BEFORE” RELATIONSHIP AND CONCURRENCY
An operation A happens before another operation B if B knows about A, or depends on A, or builds upon A in some way. Whether one operation happens before another operation is the key to defining what concurrency means. In fact, we can simply say that two operations are concurrent if neither happens before the other, i.e. neither knows about the other
TRACKING HAPPENS-BEFORE RELATIONSHIPS
The server maintains a version number for every key, increments the version number every time that key is written, and stores the new version number along with the value written.
When a client reads a key, the server returns all values that have not been overwritten, as well as the latest version number. A client must read a key before writing.
When a client writes a key, it must include the version number from the prior read, and it must merge together all values that it received in the prior read. (The response from a write request can be like a read, returning all current values, which allows us to chain several writes like in the example above.)
When the server receives a write with a particular version number, it can overwrite all values with that version number or below (since it knows that they have been merged into the new value), but it must keep all values with a higher version number (because those values are concurrent with the incoming write).
When a write includes the version number from a prior read, that tells us which previous state the write is based on. If you make a write without including a version number, it is concurrent to all other writes, so it will not overwrite anything — it will just be returned as one of the versions on subsequent reads.
MERGING CONCURRENTLY WRITTEN VALUES - siblings
To prevent this, an item cannot simply be deleted from the database when it is removed; instead, the system must leave a marker with an appropriate version number to indicate that the item has been removed when merging siblings. Such a deletion marker is known as a tombstone.
VERSION VECTORS
we need to use a version number per replica as well as per key. Each replica increments its own version number when processing a write, and also keeps track of the version numbers it has seen from all of the other replicas. It can then use that information to figure out which values to overwrite and which values to keep as siblings.
The collection of version numbers from all the replicas is called a version vector
riak - dotted version vector
The version vector structure ensures that it is safe to read from one replica and subsequently write back to another replica: this may result in siblings being created, but no data is lost as long as siblings are merged correctly.
VERSION VECTORS AND VECTOR CLOCKS
A version vector is sometimes also called a vector clock. The difference between these data structures is subtle — one way of looking at it is that version vectors are for client-server systems, and vector clocks are for peer-to-peer systems.
Leaderless replication: Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.
Read-after-write consistency: a user should always see data that they submitted themselves.
Monotonic reads: after a user has seen the data at one point in time, they shouldn’t later see the data from some earlier point in time.
Consistent prefix reads: users should see the data in a state that makes causal sense