Chapter 5 - Replication

My notes from the fifth chapter of Martin Kleppmann's book: Designing Data Intensive Applications.

Table of Contents

Replication involves keeping a copy of the same data on multiple machines connected via a network. Reasons for this involve:

The challenge with replication lies in handling changes to replicated data. Three algorithms for replicating changes between nodes:

Aside: These replication models are per database table or partition (or index for Elasticsearch). So in Elasticsearch for example, each index in a cluster has its primary shard, and only the primary shard can accept writes. Therefore, a node can have both primary shards and replica shards.

Leaders and Followers

Every node that keeps a copy of data is a replica. Obvious question is: how do we make sure that the data on all the replicas is the same? The most common approach for this is leader-based replication. In this approach:

  1. Only the leader accepts writes.
  2. The followers read off a replication log and apply all the writes in the same order that they were processed by the leader.
  3. A client can query either the leader or any of its followers for read requests.

So here, the followers are read-only, while writes are only accepted by the reader.

This approach is used by MySQL, PostgreSQL etc., as well as non-relational databases like MongoDB, RethinkDB, and Espresso.

Synchronous Versus Asynchronous Replication

With Synchronous replication, the leader must wait for a positive acknowledgement that the data has been replicated from at least one of the followers before terming the write as successful, while with Asynchronous replication, the leader does not have to wait.

Synchronous Replication

The advantage of synchronous replication is that if the leader suddenly fails, we are guaranteed that the data is available on the follower.

The disadvantage is that if the synchronous follower does not respond (say it has crashed or there's a network delay or something else), the write cannot be processed. A leader must block all writes and wait until the synchronous replica is available again. Therefore, it's impractical for all the followers to be synchronous, since just one node failure can cause the system to become unavailable.

In practice, enabling synchronous replication on a database usually means that one of the followers is synchronous, and the others are asynchronous. If the synchronous one is down, one of the asynchronous followers is made synchronous. This configuration is sometimes called semi-synchronous.

Asynchronous Replication

In this approach, if the leaders fails and is not recoverable, any writes that have not been replicated to followers are lost.

An advantage of this approach though, is that the leader can continue processing writes, even if all its followers have fallen behind.

There's some research into how to prevent asynchronous-performance like systems from losing data if the leader fails. A new replication method called Chain replication is a variant of synchronous replication that aims to provide good performance and availability without losing data.

Setting Up New Followers

New followers can be added to an existing cluster for reasons such as to replace a failed node, or to add an additional replica. The next question is how to ensure the new follower has an accurate copy of the leader's data?

Two options that are not sufficient are:

There's an option that works without downtime which involves:

  1. Taking a consistent snapshot of the leader's db at some point in time. It's possible to do this without taking a lock on the entire db. Most databases have this feature.
  2. Copy the snapshot to the follower node.
  3. The follower requests all the data changes that happened since the snapshot was taken.
  4. When the follower has processed the log of changes since the snapshot, we say it has caught up.

In some systems, this process is fully automated, while in others, it is manually performed by an administrator.

Handling Node Outages

Any node can fail, therefore, we need to keep the system running despite individual node failures, and minimize the impact of a node outage. How do we achieve high availability with leader-based replication?

Scenario A - Follower Failure: Catch-up recovery

Each follower typically keeps a local log of the data changes it has received from the leader. If a follower node fails, it can compare its local log to the replication log maintained by the leader, and then process all the data changes that occurred when the follower was disconnected.

Scenario B - Leader failure: Failover

This is trickier: One of the nodes needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other followers need to start consuming data changes from the new leader. This whole process is called a failover. Failover can be handled manually or automatically. An automatic failover consists of:

  1. Determining that the leader has failed: Many things could go wrong: crashes, power outages, network issues etc. There's no foolproof way of determining what has gone wrong, so most systems use a timeout. If the leader does not respond within a given interval, it's assumed to be dead.
  2. Choosing a new leader: This can be done through an election process (where the new leader is chosen by a majority of the remaining replicas), or a new leader could be appointed by a previously elected controller node. The best candidate for leadership is typically the one with the most up-to-date data changes from the old leader (to minimize data loss)
  3. Reconfiguring the system to use the new leader: Clients need to send write requests to the new leader, and followers need to process the replication log from the new leader. The system also needs to ensure that when the old leader comes back, it does not believe that it is still the leader. It must become a follower.

There are a number of things that can go wrong during the failover process:

Implementation of Replication Logs

Several replication methods are used in leader-based replication. These include:

a) Statement-based replication: In this approach, the leader logs every write request (statement) that it executes, and sends the statement log to every follower. Each follower parses and executes the SQL statement as if it had been received from a client.

Some databases work around this issues by requiring transactions to be deterministic, or configuring the leader to replace nondeterministic function calls with a fixed return value.

b) Write-ahead log (WAL) shipping: The log is an append-only sequence of bytes containing all writes to the db. Besides writing the log to disk, the leader can also send the log to its followers across the network.

The main disadvantage of this approach is that the log describes the data on a low level. It details which bytes were changed in which disk blocks. This makes the replication closely coupled to the storage engine. Meaning that if the storage engine changes in another version, we cannot have different versions running on the leader and the followers, which prevents us from making zero-downtime upgrades.

c) Logical (row-based) log replication: This logs the changes that have occurred at the granularity of a row. Meaning that:

This decouples the logical log from the storage engine internals. Thus, it makes it easier for external applications (say a data warehouse for offline analysis, or for building custom indexes and caches) to parse. This technique is called change data capture.

d) Trigger-based replication: This involves handling replication within the application code. It provides flexibility in dealing with things like: replicating only a subset of data, conflict resolution logic, replicating from one kind of database to another etc. Trigger and Stored procedures provide this functionality. This method has more overhead than other replication methods, and is more prone to bugs and limitations than the database's built-in replication.

Problems with Replication Lag

Eventual Consistency: If an application reads from an asynchronous follower, it may see outdated information if the follower has fallen the leader. This inconsistency is a temporary state, and the followers will eventually catchup. That's eventual consistency.

The delay between when a write happens on a reader and gets reflected on a follower is replication lag.

Other Consistency Levels

There are a number of issues that can occur as a result of replication lag. In this section, I'll summarize them under the minimum consistency level needed to prevent it from happening.

a) Reading Your Own Writes: If a client writes a value to a leader and tries to read that same value, the read request might go to an asynchronous follower that has not received the write yet as a result of replication lag. The user might think the data was lost, when it really wasn't. The consistency level needed to prevent this situation is known as read-after-write consistency or read-your-writes consistency. It makes the guarantee that a user will always see their writes. There are a number of various techniques for implementing this:

There's an extra complication with this if the same user is accessing my service across multiple devices say a desktop browser and a mobile app. They might be connected through different networks, yet we need to make sure they're in sync. This is known as cross-device read-after-write consistency. This is more complicated for reasons like the fact that:

b) Monotonic Reads: An anomaly that can occur when reading from asynchronous followers is that it's possible for a user to see things moving backward in time. Imagine a scenario where a user makes the same read multiple times, and each read request goes to a different follower. It's possible that a write has appeared on some followers, and not on others. Time might seem to go backwards sometimes when the user sees old data, after having read newer data.

Monotonic reads is a consistency level that guarantees that a user will not read older data after having previously read newer data. This guarantee is stronger than eventual consistency, but weaker than strong consistency.

A solution to this is that every read from a user should go to the same replica. The hash of a user's id could be used to determine what replica to go to.

c) Consistent Prefix Reads: Another anomaly that can occur as a result of replication lag is a violation of causality. Meaning that a sequence of writes that occur in one order might be read in another order. This can especially happen in distributed databases where different partitions operate independently and there's no global ordering of writes. Consistent prefix reads is a guarantee that prevents this kind of problem.

One solution is to ensure that causally related writes are always written to the same partition, but this cannot always be done efficiently.

Solutions for Replication Lag

Application developers should ideally not have to worry about subtle replication issues and should trust that their databases "do the right thing". This is why transactions exist. They allow databases to provide stronger guarantees about things like consistency. However, many distributed databases have abandoned transactions because of the complexity, and have asserted that eventual consistency is inevitable. Martin discusses these claims later in the chapter.

Multi-Leader Replication

The downside of single-leader replication is that all writes must go through that leader. If the leader is down, or a connection can't be made for whatever reason, you can't write to the database.

Multi-leader/Master-master/Active-Active replication allows more than one node to accept writes. Each leader accepts writes from a client, and acts as a follower by accepting the writes on other leaders.

Use Cases for Multi-Leader Replication

Handling Write Conflicts.

Multi-leader replication has the big disadvantage that write conflicts can occur, which requires conflict resolution.

If two users change the same record, the writes may be successfully applied to their local leader. However, when the writes are asynchronously replicated, a conflict will be detected. This does not happen in a single-leader database.

Synchronous versus asynchronous conflict detection

In theory, we could make conflict detection synchronous, meaning that we wait for the write to be replicated to all replicas before telling the user that the write was successful. Doing this will make one lose the main advantage of multi-leader replication though, which is allowing each replica to accept writes independently. Use single-leader replication if you want synchronous conflict detection.

Conflict Avoidance

Conflict avoidance is the simplest strategy for dealing with conflicts. Conflicts can be avoided by ensuring that all the writes for a particular record go through the same leader. For example, you can make all the writes for a user go to the same datacenter, and use the leader there for reading and writing. This of course has a downside that if a datacenter fails, traffic needs to be rerouted to another datacenter, and there's a possibility of concurrent writes on different leaders, which could break down conflict avoidance.

Converging toward a consistent state

A database must resolve conflicts in a convergent way, meaning that all the replicas must arrive at the same final value when all changes have been replicated.

Various ways of achieving this are by:

Custom Conflict Resolution Logic

The most appropriate conflict resolution method may depend on the application, and thus, multi-leader replication tools often let users write conflict resolution logic using application code. The code may be executed on read or on write:

Automatic conflict resolution is a difficult problem, but there are some research ideas being used today:

It's still an open area of research though.

Multi-Leader Replication Topologies

A replication topology is the path through which writes are propagated from one node to another. The most general topology is all-to-all, where each leader sends its writes to every other leader. Other types are circular topology and star topology.

All-to-all topology is more fault tolerant than the circular and star topologies because in those topologies, one node failing can interrupt the flow of replication messages across other nodes, making them unable to communicate until the node is fixed.

Leaderless Replication

In this replication style, the concept of a leader is abandoned, and any replica can typically accept writes from clients directly.

This style is used by Amazon for its in-house Dynamo system. Riak, Cassandra and Voldermort also use this model. These are called Dynamo style systems.

In some leaderless implementations, the client writes directly to several replicas, while in others there's a coordinator node that does this on behalf of the client. Unlike a leader database though, this coordinator does not enforce any ordering of the writes.

Preventing Stale Reads

Say there are 3 replicas and one of the replicas goes down. A client could write to the system and have 2 of the replicas successfully acknowledge the write. However, when the offline node gets back up, anyone who reads from it may get stale responses.

To prevent stale reads, as well as writing to multiple replicas, the client also reads from multiple replicas in parallel. Version numbers are attached to the result to determine which value is newer.

Read repair and anti-entropy

When offline nodes come back up, the replication system must ensure that all data is eventually copied to every replica. Two mechanisms used in Dynamo-style datastores are:

Quorums for reading and writing

Quorum reads and writes refer to the minimum number of votes for a read or a write to be valid. If there are n replicas, every write must be confirmed by at least w nodes to be considered successful, and every read must be confirmed by at least r nodes to be successful. The general rule that the number chosen for r and w should obey is that:

w + r > n.

This way, we can typically expect an up-to-date value when reading because at least one of the r nodes we're reading from must overlap with the w nodes (barring sloppy quorums which are discussed below)

The parameters n, w, and r are typically configurable. A common choice is to make n an odd number such that w = r = (n + 1)/2. These numbers can be varied though. For a workload with few writes and many reads, it may make sense to set w = n and r = 1. Of course this has the disadvantage of reduced availability for writes if just one node fails.

Note that n does not always refer to the number of nodes in the cluster, it may just be the number of nodes that any given value must be stored on. This allows datasets to be partitioned. Partitioning is discussed in Chapter 5.


Limitations of Quorum Consistency

Quorums don't necessarily have to be majorities i.e. w + r > n. What matters is that the sets of nodes used by the read and write operations overlap in at least one node.

We could also set w and r to smaller numbers, so that w + r ≤ n. With this, reads and writes are still sent to n nodes, but a smaller number of successful responses is required for the operation to succeed. However, you are also more likely to read stale values, as it's more likely that a read did not include the node with the latest value.

The upside of the approach though is that it allows lower latency and higher availability: if there's a network interruption and many replicas become unreachable, there's a higher chance that reads and writes can still be processed.

Even if we configure our database such that w + r > n , there are still edge cases where stale values may be returned. Possible scenarios are:

From these points and others not listed, there is no absolute guarantee that quorum reads return the latest written value. These style of databases are optimized for use cases that can tolerate eventual consistency. Stronger guarantees require transactions or consensus.

Monitoring Staleness

It's important to monitor whether databases are returning up-to-date results, even if the application can tolerate stale reads. If a replica falls behind significantly, the database should alert you so that you can investigate the cause.

For leader-based replication, databases expose metrics for the replication lag. It's possible to do this because writes are applied to the leader and followers in the same order. We can determine how far behind a follower has fallen from a leader by subtracting it's position from the leader's current position.

This is more difficult in leaderless replication systems as there is no fixed order in which writes are applied. There's some research into this, but it's not common practice yet.

Sloppy Quorums and Hinted Handoff

Databases with leaderless replication are appealing for use cases where high availability and low latency is required, as well as the ability to tolerate occasional stale reads. This is because they can tolerate failure of individual nodes without needing to failover since they're not relying on one node. They can also tolerate individual nodes going slow, as long as w or r nodes have responded.

Note that the quorums described so far are not as fault tolerant as they can be. If any of the designated n nodes is unavailable for whatever reason, it's less likely that you'll be able to have w or r nodes reachable, making the system unavailable. Nodes being unavailable can be caused by anything, even something as simple as a network interruption.

To make the system more fault tolerant, instead of returning errors to all requests for which can't reach a quorum of w or r nodes, the system could accept reads and writes on nodes that are reachable, even if they are not among the designated n nodes on which the value usually lives. This concept is known as a sloppy quorum.

With a sloppy quorum, during network interruptions, reads and writes still require r and w successful responses, but they do not have to be among the designated n "home" nodes for a value. These are like temporary homes for the value.

When the network interruption is fixed, the writes that were temporarily accepted on behalf of another node are sent to the appropriate "home" node. This is hinted handoff.

Sloppy quorums are particularly useful for increasing write availability. However, it also means that even when w + r > n, there is a possibility of reading stale data, as the latest value may have been temporarily written to some values outside of n.

Sloppy quorum is more of an assurance of durability, than an actual quorum.

Multi-datacenter operation

For datastores like Cassandra and Voldermort which implement leaderless replication across multiple datacenters, the number of replicas n includes replicas in all datacenter.

Each write is also sent to all datacenters, but it only waits for acknowledgement from a quorum of nodes within its local datacenter so that it's not affected by delays and interruptions on the link between multiple datacenters.

Detecting Concurrent Writes

In dynamo-style databases, several clients can concurrently write to the same key. When this happens, we have a conflict. We've briefly touched on conflict resolution techniques already, but we'll discuss them in more detail.

Last write wins (discarding concurrent writes)

One approach for conflict resolution is the last write wins approach. It involves forcing an arbitrary ordering on concurrent writes (could be by using timestamps), picking the most "recent" value, and discarding writes with an earlier timestamp.

This helps to achieve the goal of eventual convergence across the data in replicas, at the cost of durability. If there were several concurrent writes the same key, only one of the writes will survive and the others will be discarded, even if all the writes were reported as successful.

Last write wins (LWW) is the only conflict resolution method supported by Apache Cassandra.

If losing data is not acceptable, LWW is not a good choice for conflict resolution.

The "happens-before" relationship and concurrency

Whenever we have two operations A and B, there are three possibilities:

We say that an operation A happened before operation B if either of the following applies:

Thus, if we cannot capture this relationship between A and B, we say that they are concurrent. If they are concurrent, we have a conflict that needs to be resolved.

Note: Exact time does not matter for defining concurrency, two operations are concurrent if they are both unaware of each other, regardless of the physical time which they occurred. Two operations can happen sometime apart and still be concurrent, as long as they are unaware of each other.

Capturing the happens-before relationship

In a single database replica, version numbers are used to determine concurrency.

It works like this:

Example scenario:

If two clients are trying to write a value for the same key at the same time, both would first read the data for that key and get the latest version number of say: 3. If one of them writes first, the version number will be updated to 4 from the database end. However, since the slower one will pass a version number of 3, it means it is concurrent with the other one since it's not aware of the higher version number of 4.

When a write includes the version number from a prior read, that tells us which previous state the write is based on.

Merging Concurrently Written Values

With the algorithm described above, clients have to do the work of merging concurrently written values. Riak calls these values siblings.

A simple merging approach is to take a union of the values. However, this can be faulty if one operation deleted a value but that value is still present in a sibling. To prevent this problem, the system must leave a marker (tombstone) to indicate that an item has been removed when merging siblings.

CRDTs are data structures that can automatically merge siblings in sensible ways, including preserving deletions.

Version Vectors

The algorithm described above used only a single replica. When we have multiple replicas, we use a version number per replica and per key and follow the same algorithm. Note that each replica also keeps track of the version numbers seen from each of the other replicas. With this information, we know which values to overwrite and which values to keep as siblings.

The collection of version numbers from all the replicas is called a version vector. Dotted version vectors are a nice variant of this used in Riak: https://riak.com/posts/technical/vector-clocks-revisited-part-2-dotted-version-vectors/

Version vectors are also sent to clients when values are read, and need to be sent back to the database when a value is written.

Version vectors enable us to distinguish between overwrites and concurrent writes.

We also have Vector clocks, which are different from Version Vectors apparently: https://haslab.wordpress.com/2011/07/08/version-vectors-are-not-vector-clocks/

distributed-systems learning-diary ddia

To get notified when I write something new, you can subscribe to the RSS feed.

A small favour

Did you find anything I wrote confusing, outdated, or incorrect? Or do you have an answer to a question I posed here? Please let me know! Just write a few words below and I'll be sure to amend this post with your suggestions.

← Home