System Design for Big Data [Consistent Hashing]
Consistent hashing maps an object to the same cache host if possible. If a cache host is removed, the objects on that host will be shared by other hosts; If a new cache is added, it takes its share from other hosts without touching other shares.
Suppose the output of the hash function are in the range of [0, 2^128) (e.g. MD5 hash). Image that the integers in the range are placed on a ring such that the values are wrapped around.
Here's how consistent hashing works:
To add a new cache, say D, keys that were originally falling to C will be split and some of them will be moved to D. Other keys don't need to be touched.
To remove a cache or if a cache failed, say C, all keys that were originally mapping to C will fall into A and only those keys need to be moved to A. Other keys don't need to be touched.
the real data are essentially randomly distributed and thus may not be uniform. It may cause the keys on caches are unbalanced.
To resolve this issue, we add "virtual replicas" for caches.
If the hash function "mixes well", as the number of replicas increases, the keys will be more balanced. See [3] for a simulation result.
http://www.tom-e-white.com//2007/11/consistent-hashing.html
The basic idea behind the consistent hashing algorithm is to hash both objects and caches using the same hash function. The reason to do this is to map the cache to an interval, which will contain a number of object hashes. If the cache is removed then its interval is taken over by a cache with an adjacent interval. All the other caches remain unchanged.
public class ConsistentHash<T> {
private final HashFunction hashFunction;
private final int numberOfReplicas;
private final SortedMap<Integer, T> circle =
new TreeMap<Integer, T>();
public ConsistentHash(HashFunction hashFunction,
int numberOfReplicas, Collection<T> nodes) {
this.hashFunction = hashFunction;
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
add(node);
}
}
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.put(hashFunction.hash(node.toString() + i),
node);
}
}
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hashFunction.hash(node.toString() + i));
}
}
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
int hash = hashFunction.hash(key);
if (!circle.containsKey(hash)) {
SortedMap<Integer, T> tailMap =
circle.tailMap(hash);
hash = tailMap.isEmpty() ?
circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
}
only the client that needs to implement the consistent hashing algorithm - the memcached server is unchanged. Other systems that employ consistent hashing include Chord, which is a distributed hash table implementation, and Amazon's Dynamo, which is a key-value store (not available outside Amazon).
http://www.tomkleinpeter.com/2008/03/17/programmers-toolbox-part-3-consistent-hashing/
http://www.paperplanes.de/2011/12/9/the-magic-of-consistent-hashing.html
https://github.com/zxqiu/leetcode-lintcode/blob/master/system%20design/Consistent_Hashing.java
一般的数据库进行horizontal shard的方法是指,把 id 对 数据库服务器总数 n 取模,然后来得到他在哪台机器上。这种方法的缺点是,当数据继续增加,我们需要增加数据库服务器,将 n 变为 n+1 时,几乎所有的数据都要移动,这就造成了不 consistent。为了减少这种 naive 的 hash方法(%n) 带来的缺陷,出现了一种新的hash算法:一致性哈希的算法——Consistent Hashing。这种算法有很多种实现方式,这里我们来实现一种简单的 Consistent Hashing。
将 id 对 360 取模,假如一开始有3台机器,那么让3台机器分别负责0~119, 120~239, 240~359 的三个部分。那么模出来是多少,查一下在哪个区间,就去哪台机器。
当机器从 n 台变为 n+1 台了以后,我们从n个区间中,找到最大的一个区间,然后一分为二,把一半给第n+1台机器。
比如从3台变4台的时候,我们找到了第3个区间0~119是当前最大的一个区间,那么我们把0~119分为0~59和60~119两个部分。0~59仍然给第1台机器,60~119给第4台机器。
然后接着从4台变5台,我们找到最大的区间是第3个区间120~239,一分为二之后,变为 120~179, 180~239。
假设一开始所有的数据都在一台机器上,请问加到第 n 台机器的时候,区间的分布情况和对应的机器编号分别是多少?
Notice
你可以假设 n <= 360. 同时我们约定,当最大区间出现多个时,我们拆分编号较小的那台机器。
比如0~119, 120~239区间的大小都是120,但是前一台机器的编号是1,后一台机器的编号是2, 所以我们拆分0~119这个区间。
Clarification
If the maximal interval is [x, y], and it belongs to machine id z, when you add a new machine with id n, you should divide [x, y, z] into two intervals:
[x, (x + y) / 2, z] and [(x + y) / 2 + 1, y, n]
Example
for n = 1, return
[
[0,359,1]
]
represent 0~359 belongs to machine 1.
for n = 2, return
[
[0,179,1],
[180,359,2]
]
for n = 3, return
[
[0,89,1]
[90,179,3],
[180,359,2]
]
for n = 4, return
[
[0,89,1],
[90,179,3],
[180,269,2],
[270,359,4]
]
for n = 5, return
[
[0,44,1],
[45,89,5],
[90,179,3],
[180,269,2],
[270,359,4]
*/
public class Solution {
/**
* @param n a positive integer
* @return n x 3 matrix
*/
public List<List<Integer>> consistentHashing(int n) {
List<List<Integer>> ret = new ArrayList<List<Integer>>();
if (n == 0) {
return ret;
}
List<Integer> newMachine = new ArrayList<Integer>();
newMachine.add(0);
newMachine.add(359);
newMachine.add(1);
ret.add(newMachine);
for (int i = 1; i < n; i++) {
newMachine = new ArrayList<Integer>();
int max = Integer.MIN_VALUE;
List<Integer> target = ret.get(0);
for (int j = 0; j < ret.size(); j++) {
List<Integer> l = ret.get(j);
if (l.get(1) - l.get(0) + 1 > max) {
max = l.get(1) - l.get(0) + 1;
target = l;
}
}
if (max == 1) {
return ret;
}
newMachine.add((target.get(1) + target.get(0)) / 2 + 1);
newMachine.add(target.get(1));
newMachine.add(i + 1);
target.set(1, (target.get(1) + target.get(0)) / 2);
ret.add(newMachine);
}
return ret;
}
https://github.com/zxqiu/leetcode-lintcode/blob/master/system%20design/Consistent_Hashing_II.java
在 Consistent Hashing I 中我们介绍了一个比较简单的一致性哈希算法,这个简单的版本有两个缺陷:
增加一台机器之后,数据全部从其中一台机器过来,这一台机器的读负载过大,对正常的服务会造成影响。
当增加到3台机器的时候,每台服务器的负载量不均衡,为1:1:2。
为了解决这个问题,引入了 micro-shards 的概念,一个更好的算法是这样:
将 360° 的区间分得更细。从 0~359 变为一个 0 ~ n-1 的区间,将这个区间首尾相接,连成一个圆。
当加入一台新的机器的时候,随机选择在圆周中撒 k 个点,代表这台机器的 k 个 micro-shards。
每个数据在圆周上也对应一个点,这个点通过一个 hash function 来计算。
一个数据该属于那台机器负责管理,是按照该数据对应的圆周上的点在圆上顺时针碰到的第一个 micro-shard 点所属的机器来决定。
n 和 k在真实的 NoSQL 数据库中一般是 2^64 和 1000。
请实现这种引入了 micro-shard 的 consistent hashing 的方法。主要实现如下的三个函数:
create(int n, int k)
addMachine(int machine_id) // add a new machine, return a list of shard ids.
getMachineIdByHashCode(int hashcode) // return machine id
Notice
当 n 为 2^64 时,在这个区间内随机基本不会出现重复。
但是为了方便测试您程序的正确性,n 在数据中可能会比较小,所以你必须保证你生成的 k 个随机数不会出现重复。
LintCode并不会判断你addMachine的返回结果的正确性(因为是随机数),只会根据您返回的addMachine的结果判断你getMachineIdByHashCode结果的正确性。
Example
create(100, 3)
addMachine(1)
>> [3, 41, 90] => 三个随机数
getMachineIdByHashCode(4)
>> 1
addMachine(2)
>> [11, 55, 83]
getMachineIdByHashCode(61)
>> 2
getMachineIdByHashCode(91)
>> 1
*/
public class Solution {
static int[] hashToMachine;
static int shardsPerMachine;
static int maxShards;
static int availableShards;
// @param n a positive integer
// @param k a positive integer
// @return a Solution object
public static Solution create(int n, int k) {
Solution solution = new Solution();
solution.maxShards = n;
solution.shardsPerMachine = k;
solution.availableShards = n;
solution.hashToMachine = new int[n];
Arrays.fill(hashToMachine, -1);
return solution;
}
// @param machine_id an integer
// @return a list of shard ids
public List<Integer> addMachine(int machine_id) {
if (shardsPerMachine > availableShards) {
return new ArrayList<Integer>();
}
List<Integer> shards = randomNumber();
for (Integer i : shards) {
hashToMachine[i] = machine_id;
}
availableShards -= shardsPerMachine;
Collections.sort(shards);
return shards;
}
// @param hashcode an integer
// @return a machine id
public int getMachineIdByHashCode(int hashcode) {
int ret = hashcode % maxShards;
while (hashToMachine[ret] == -1) {
ret = (ret + 1) % maxShards;
}
return hashToMachine[ret];
}
private static List<Integer> randomNumber() {
Set<Integer> dupCheck = new HashSet<Integer>();
List<Integer> ret = new ArrayList<Integer>();
Random r = new Random();
while (ret.size() < shardsPerMachine) {
int candidate = r.nextInt(maxShards);
if (dupCheck.contains(candidate) || hashToMachine[candidate] != -1) {
continue;
}
ret.add(candidate);
dupCheck.add(candidate);
}
return ret;
}
Please read full article from System Design for Big Data [Consistent Hashing]
What is Consistent Hashing?
Consistent Hashing is a hashing strategy such that when the hash table is resized (e.g. a new cache host is added to the system), only k/n keys need to be remapped, where k is the number of keys andn is the number of caches. Recall that in a caching system using the mod as hash function, all keys need to be remapped.Consistent hashing maps an object to the same cache host if possible. If a cache host is removed, the objects on that host will be shared by other hosts; If a new cache is added, it takes its share from other hosts without touching other shares.
How it works?
As a typical hash function, consistent hashing maps a key or a cache host to an integer.Suppose the output of the hash function are in the range of [0, 2^128) (e.g. MD5 hash). Image that the integers in the range are placed on a ring such that the values are wrapped around.
Here's how consistent hashing works:
- Given a list of cache servers, hash them to integers in the range.
- To map a key to a server,
- Hash it to a single integer.
- Move clockwise on the ring until finding the first cache it encounters.
- That cache is the one that contains the key.
To add a new cache, say D, keys that were originally falling to C will be split and some of them will be moved to D. Other keys don't need to be touched.
To remove a cache or if a cache failed, say C, all keys that were originally mapping to C will fall into A and only those keys need to be moved to A. Other keys don't need to be touched.
the real data are essentially randomly distributed and thus may not be uniform. It may cause the keys on caches are unbalanced.
To resolve this issue, we add "virtual replicas" for caches.
- For each cache, instead of mapping it to a single point on the ring, we map it to multiple points on the ring, i.e. replicas.
- By doing this, each cache is associated with multiple segments of the ring.
Virtual Replicas of Caches |
Monotone Keys
If keys are known to be monotonically increased, binary searching can be used to improve the performance of locating a cache for a given key. Then the locate time can be reduced to O(logn).http://www.tom-e-white.com//2007/11/consistent-hashing.html
The basic idea behind the consistent hashing algorithm is to hash both objects and caches using the same hash function. The reason to do this is to map the cache to an interval, which will contain a number of object hashes. If the cache is removed then its interval is taken over by a cache with an adjacent interval. All the other caches remain unchanged.
public class ConsistentHash<T> {
private final HashFunction hashFunction;
private final int numberOfReplicas;
private final SortedMap<Integer, T> circle =
new TreeMap<Integer, T>();
public ConsistentHash(HashFunction hashFunction,
int numberOfReplicas, Collection<T> nodes) {
this.hashFunction = hashFunction;
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
add(node);
}
}
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.put(hashFunction.hash(node.toString() + i),
node);
}
}
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hashFunction.hash(node.toString() + i));
}
}
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
int hash = hashFunction.hash(key);
if (!circle.containsKey(hash)) {
SortedMap<Integer, T> tailMap =
circle.tailMap(hash);
hash = tailMap.isEmpty() ?
circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
}
http://www.tomkleinpeter.com/2008/03/17/programmers-toolbox-part-3-consistent-hashing/
http://www.paperplanes.de/2011/12/9/the-magic-of-consistent-hashing.html
https://github.com/zxqiu/leetcode-lintcode/blob/master/system%20design/Consistent_Hashing.java
一般的数据库进行horizontal shard的方法是指,把 id 对 数据库服务器总数 n 取模,然后来得到他在哪台机器上。这种方法的缺点是,当数据继续增加,我们需要增加数据库服务器,将 n 变为 n+1 时,几乎所有的数据都要移动,这就造成了不 consistent。为了减少这种 naive 的 hash方法(%n) 带来的缺陷,出现了一种新的hash算法:一致性哈希的算法——Consistent Hashing。这种算法有很多种实现方式,这里我们来实现一种简单的 Consistent Hashing。
将 id 对 360 取模,假如一开始有3台机器,那么让3台机器分别负责0~119, 120~239, 240~359 的三个部分。那么模出来是多少,查一下在哪个区间,就去哪台机器。
当机器从 n 台变为 n+1 台了以后,我们从n个区间中,找到最大的一个区间,然后一分为二,把一半给第n+1台机器。
比如从3台变4台的时候,我们找到了第3个区间0~119是当前最大的一个区间,那么我们把0~119分为0~59和60~119两个部分。0~59仍然给第1台机器,60~119给第4台机器。
然后接着从4台变5台,我们找到最大的区间是第3个区间120~239,一分为二之后,变为 120~179, 180~239。
假设一开始所有的数据都在一台机器上,请问加到第 n 台机器的时候,区间的分布情况和对应的机器编号分别是多少?
Notice
你可以假设 n <= 360. 同时我们约定,当最大区间出现多个时,我们拆分编号较小的那台机器。
比如0~119, 120~239区间的大小都是120,但是前一台机器的编号是1,后一台机器的编号是2, 所以我们拆分0~119这个区间。
Clarification
If the maximal interval is [x, y], and it belongs to machine id z, when you add a new machine with id n, you should divide [x, y, z] into two intervals:
[x, (x + y) / 2, z] and [(x + y) / 2 + 1, y, n]
Example
for n = 1, return
[
[0,359,1]
]
represent 0~359 belongs to machine 1.
for n = 2, return
[
[0,179,1],
[180,359,2]
]
for n = 3, return
[
[0,89,1]
[90,179,3],
[180,359,2]
]
for n = 4, return
[
[0,89,1],
[90,179,3],
[180,269,2],
[270,359,4]
]
for n = 5, return
[
[0,44,1],
[45,89,5],
[90,179,3],
[180,269,2],
[270,359,4]
*/
public class Solution {
/**
* @param n a positive integer
* @return n x 3 matrix
*/
public List<List<Integer>> consistentHashing(int n) {
List<List<Integer>> ret = new ArrayList<List<Integer>>();
if (n == 0) {
return ret;
}
List<Integer> newMachine = new ArrayList<Integer>();
newMachine.add(0);
newMachine.add(359);
newMachine.add(1);
ret.add(newMachine);
for (int i = 1; i < n; i++) {
newMachine = new ArrayList<Integer>();
int max = Integer.MIN_VALUE;
List<Integer> target = ret.get(0);
for (int j = 0; j < ret.size(); j++) {
List<Integer> l = ret.get(j);
if (l.get(1) - l.get(0) + 1 > max) {
max = l.get(1) - l.get(0) + 1;
target = l;
}
}
if (max == 1) {
return ret;
}
newMachine.add((target.get(1) + target.get(0)) / 2 + 1);
newMachine.add(target.get(1));
newMachine.add(i + 1);
target.set(1, (target.get(1) + target.get(0)) / 2);
ret.add(newMachine);
}
return ret;
}
https://github.com/zxqiu/leetcode-lintcode/blob/master/system%20design/Consistent_Hashing_II.java
在 Consistent Hashing I 中我们介绍了一个比较简单的一致性哈希算法,这个简单的版本有两个缺陷:
增加一台机器之后,数据全部从其中一台机器过来,这一台机器的读负载过大,对正常的服务会造成影响。
当增加到3台机器的时候,每台服务器的负载量不均衡,为1:1:2。
为了解决这个问题,引入了 micro-shards 的概念,一个更好的算法是这样:
将 360° 的区间分得更细。从 0~359 变为一个 0 ~ n-1 的区间,将这个区间首尾相接,连成一个圆。
当加入一台新的机器的时候,随机选择在圆周中撒 k 个点,代表这台机器的 k 个 micro-shards。
每个数据在圆周上也对应一个点,这个点通过一个 hash function 来计算。
一个数据该属于那台机器负责管理,是按照该数据对应的圆周上的点在圆上顺时针碰到的第一个 micro-shard 点所属的机器来决定。
n 和 k在真实的 NoSQL 数据库中一般是 2^64 和 1000。
请实现这种引入了 micro-shard 的 consistent hashing 的方法。主要实现如下的三个函数:
create(int n, int k)
addMachine(int machine_id) // add a new machine, return a list of shard ids.
getMachineIdByHashCode(int hashcode) // return machine id
Notice
当 n 为 2^64 时,在这个区间内随机基本不会出现重复。
但是为了方便测试您程序的正确性,n 在数据中可能会比较小,所以你必须保证你生成的 k 个随机数不会出现重复。
LintCode并不会判断你addMachine的返回结果的正确性(因为是随机数),只会根据您返回的addMachine的结果判断你getMachineIdByHashCode结果的正确性。
Example
create(100, 3)
addMachine(1)
>> [3, 41, 90] => 三个随机数
getMachineIdByHashCode(4)
>> 1
addMachine(2)
>> [11, 55, 83]
getMachineIdByHashCode(61)
>> 2
getMachineIdByHashCode(91)
>> 1
*/
public class Solution {
static int[] hashToMachine;
static int shardsPerMachine;
static int maxShards;
static int availableShards;
// @param n a positive integer
// @param k a positive integer
// @return a Solution object
public static Solution create(int n, int k) {
Solution solution = new Solution();
solution.maxShards = n;
solution.shardsPerMachine = k;
solution.availableShards = n;
solution.hashToMachine = new int[n];
Arrays.fill(hashToMachine, -1);
return solution;
}
// @param machine_id an integer
// @return a list of shard ids
public List<Integer> addMachine(int machine_id) {
if (shardsPerMachine > availableShards) {
return new ArrayList<Integer>();
}
List<Integer> shards = randomNumber();
for (Integer i : shards) {
hashToMachine[i] = machine_id;
}
availableShards -= shardsPerMachine;
Collections.sort(shards);
return shards;
}
// @param hashcode an integer
// @return a machine id
public int getMachineIdByHashCode(int hashcode) {
int ret = hashcode % maxShards;
while (hashToMachine[ret] == -1) {
ret = (ret + 1) % maxShards;
}
return hashToMachine[ret];
}
private static List<Integer> randomNumber() {
Set<Integer> dupCheck = new HashSet<Integer>();
List<Integer> ret = new ArrayList<Integer>();
Random r = new Random();
while (ret.size() < shardsPerMachine) {
int candidate = r.nextInt(maxShards);
if (dupCheck.contains(candidate) || hashToMachine[candidate] != -1) {
continue;
}
ret.add(candidate);
dupCheck.add(candidate);
}
return ret;
}
Please read full article from System Design for Big Data [Consistent Hashing]