目录
一. 前言
二. 一致性哈希算法
三. Redis Cluster 的一致性哈希算法
四. Java 实现的一致性哈希
五. 分库分表中一致性哈希实践
5.1. 基于 hash 环一致性哈希算法的分库分表
5.2. 美团一致性哈希算法
5.3. 平均分布方案
一. 前言
普通的 hash 算法(hashcode % size),如果 size 发生变化,几乎所有的历史数据都需要重新hash、移动,代价非常大,常见的 Java 中的 HashMap 就是如此。
那如果在 hash 表扩容或者收缩的时候 size 能够保持不变,即历史数据在 hash 表中的位置不变,这样就解决了 hash 表扩缩时的大量数据移动问题。
一致性哈希可以理解为,就是 hash 函数(hashcode % size)的 size 保持不变,从而保证了hash 函数的前后一致性。
二. 一致性哈希算法
一致性哈希算法主要应用在分布式缓存系统中,在增加或者删除服务器节点时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系,也就是系统中的大多数历史缓存的存储服务器节点可以不变,解决了普通 hash 算法带来的动态伸缩性问题。
如上图一致性哈希定义了一个 0 ~ -1 的 hash 环,hash 函数并不是按照服务器节点的数量取模,而是按照 取模(hashcode % ),这样请求的数据就会落在环上某个固定的位置。
服务器节点按照 IP 或域名进行 hash,分配到 hash 环上,如图分配了 4 个服务器节点,分别在hash 环的 10000、20000、30000、40000 位置(为了方便演示)。
请求数据先通过 hash 函数(hashcode % )确定了在环上的位置,再沿着环顺时针查找,遇到的第一个节点就是命中的服务器节点。
新增、删除节点
新增节点 E(25000),按照一致性哈希算法,只有 B ~ E 之间的历史数据会受到影响,(之前是路由到 C 的,现在路由到 E ),即只有 C 的一部分数据需要迁移到 E。
删除节点 B,那么 A~ B 之间的历史数据丢失,并且新增数据会被插入到 C,其他的节点都不会受到影响。
可以看到一致性哈希算法,节点的增删都只会影响系统中的一小部分数据,容错性非常好。
但是上面的模型还是有个问题:如果服务器节点太少或者出现热点数据,就会导致服务器节点上之间的数据分布不均匀,并且还可能出现缓存雪崩的问题。
缓存雪崩:
如果每个服务器在环上只有一个节点,那么当服务器宕机,它原本所负责的缓存数据将全部交由顺时针方向的下一个服务器节点处理。例如,当 B 退出时,它原本所负责的缓存将全部交给 C 处理。这就意味着 C 的访问压力会瞬间增大。设想一下,如果 C 因为压力过大而崩溃,那么更大的压力又会向 D 压过去,最终服务压力就像滚雪球一样越滚越大,最终导致缓存雪崩。
虚拟节点:
一致性哈希通过引入虚拟节点解决了这个问题,每个实际节点映射多个虚拟节点,数据按照规则找到虚拟节点后,再储存到映射的实际节点上;因为虚拟节点可以在 hash 环上均匀分布,这意味着当一个真实节点失效退出后,它原来所承载的压力将会均匀地分散到其他节点上去,解决缓存雪崩问题。
三. Redis Cluster 的一致性哈希算法
Redis Cluster 没有采用基于 hash 环的一致性算法,而是引入了哈希槽(slots)的概念,所有的数据都是存在 slot 中,Redis Cluster 一共有 (16384)个 slot,每个 Master 节点负责一部分 slot。
节点路由:
当客户端连接向某个 Master 节点发送请求时,接收到命令的节点首先会通过 CRC-16(key) % 16384 计算出当前 key 所属的 slot,如果该 slot 由自己负责,直接处理响应客户端的请求,如果不是,则向客户端返回 MOVED 重定向请求,并将该 slot 对应的服务器节点的 ip 和 port 一并返回,客户端拿着这些数据重新访问。
集群扩容:
集群新增 Master 节点后,需要通过 reshard 来将 slot 重新分配,假设我们需要向集群中加入一个D 节点,而此时集群内已经有 A、B、C 三个节点了。此时 redis-trib 会向 A、B、C 三个节点发送迁移出 slot 的请求,同时向 D 节点发送准备导入 slot 的请求,做好准备之后 A、B、C 这三个源节点就开始执行迁移,将对应的 slot 所对应的键值对迁移至目标节点 D。
节点宕机:
针对 A 节点,某一个节点认为 A 宕机了,那么此时是主观宕机。而如果集群内超过半数的节点认为 A 挂了, 那么此时 A 就会被标记为客观宕机。
一旦节点 A 被标记为了客观宕机,集群就会开始执行故障转移。其余正常运行的 Master 节点会进行投票选举,从 A 节点的 Slave 节点中选举出一个,将其切换成新的 Master 对外提供服务。当某个 Slave 获得了超过半数的 Master 节点投票,就会成功当选,成功之后停止复制 A 节点,使自己成为 Master。然后将 A 节点所负责处理的 slot,全部转移给自己,然后就会向集群发 PONG 消息来广播自己的最新状态。
可以看到对于 Redis Cluster,CRC-16(key) % 16384 保证了某个 key 只会落固定的一个 slot 上,并不需要关心它最终要去到哪个服务器节点。
四. Java 实现的一致性哈希
package com.lm.hash;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class HashFunction {
private MessageDigest md5 = null;
public long hash(String key) {
if (null == md5) {
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException ex) {
throw new IllegalStateException("no md5 algrithm found");
}
}
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
// 具体的哈希函数实现细节 ---- 每个字节 & 0xFF 再移位
long result = ((long) (bKey[3] & 0xFF) << 24)
| ((long) (bKey[2] & 0xFF) << 16 | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF));
return result & 0xffffffffL;
}
}
package com.lm.hash;
import java.util.*;
public class ConsistentHash<T> {
private final HashFunction hashFunction;
// 节点的复制因子,实际节点个数 * numberOfReplicas = 虚拟节点个数
private final int numberOfReplicas;
// 存储虚拟节点的hash值到真实节点的映射
private final SortedMap<Long, T> circle = new TreeMap<>();
public ConsistentHash(HashFunction hashFunction, int numberOfReplicas, Collection<T> nodes) {
this.hashFunction = hashFunction;
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
add(node);
}
}
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
// 对于一个实际节点 node,对应 numberOfReplicas 个虚拟节点
/**
* 不同的虚拟节点(i不同)有不同的hash值,但都对应同一个实际机器node
* 虚拟node一般是均衡分布在环上的,数据存储在顺时针方向的虚拟node上
*/
circle.put(hashFunction.hash(node.toString() + i), node);
}
}
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hashFunction.hash(node.toString() + i));
}
}
/**
* 获得一个最近的顺时针节点,根据给定的key 取Hash
* 然后再取得顺时针方向上最近的一个虚拟节点对应的实际节点
* 再从实际节点中取得数据
*
* @param key
* @return
*/
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
// node 用String来表示,获得node在哈希环中的hashCode
long hash = hashFunction.hash((String) key);
if (!circle.containsKey(hash)) { // 数据映射在两台虚拟机器所在环之间,就需要按顺时针方向寻找机器
SortedMap<Long, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
public long getSize() {
return circle.size();
}
/**
* 查看MD5算法生成的hashCode值---表示整个哈希环中各个虚拟节点位置
*/
public void testBalance() {
// 获得TreeMap中所有的key
Set<Long> sets = circle.keySet();
// 将获得的key集合排序
SortedSet<Long> sortedSets = new TreeSet<>(sets);
for (Long hashCode : sortedSets) {
System.out.println(hashCode);
}
System.out.println("---- each location 's distance are follows: ----");
/**
* 查看用MD5算法生成的 long hashCode 相邻两个 hashCode 的差值
*/
Iterator<Long> it = sortedSets.iterator();
Iterator<Long> it2 = sortedSets.iterator();
if (it2.hasNext()) {
it2.next();
}
long keyPre, keyAfter;
while (it.hasNext() && it2.hasNext()) {
keyPre = it.next();
keyAfter = it2.next();
System.out.println(keyAfter - keyPre);
}
}
public static void main(String[] args) {
Set<String> nodes = new HashSet<>();
nodes.add("A");
nodes.add("B");
nodes.add("C");
ConsistentHash<String> consistentHash = new ConsistentHash<>(new HashFunction(), 2, nodes);
consistentHash.add("D");
System.out.println("hash circle size: " + consistentHash.getSize());
System.out.println("location of each node are follows: ");
consistentHash.testBalance();
}
}
// 运行结果:
hash circle size: 8
location of each node are follows:
748451404
769404186
1696944585
1830063320
3372629518
3766042698
3862426151
3864615324
---- each location 's distance are follows: ----
20952782
927540399
133118735
1542566198
393413180
96383453
2189173
五. 分库分表中一致性哈希实践
5.1. 基于 hash 环一致性哈希算法的分库分表
一个简单的没有虚拟节点的一致性哈希算法:
public class ConsistentHashAlgorithm {
private SortedMap<Long, String> virtualNodes = new TreeMap<>();
public ConsistentHashAlgorithm(Collection<String> tableNodes) {
initNodesToHashLoop(tableNodes);
}
public void initNodesToHashLoop(Collection<String> tableNodes) {
SortedMap<Long, String> virtualTableNodes = new TreeMap<>();
for (String node : tableNodes) {
long hash = getHash(node);
virtualTableNodes.put(hash, node);
}
for (Map.Entry<Long, String> entry : virtualTableNodes.entrySet()) {
log.info("节点[" + entry.getValue() + "]被添加, hash值为" + entry.getKey());
}
this.virtualNodes = virtualTableNodes;
}
public String getTableNode(String key) {
SortedMap<Long, String> subMap = virtualNodes.tailMap(getHash(key));
if (subMap.isEmpty()) {
return virtualNodes.get(virtualNodes.firstKey());
}
return subMap.get(subMap.firstKey());
}
/**
* 使用FNV1_32_HASH算法计算key的Hash值
*
* @param key
* @return
*/
public long getHash(String key) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < key.length(); i++) {
hash = (hash ^ key.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}
}
sharding-jdbc 自定义分片策略:
public class ConsistentShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
private ConsistentHashAlgorithm consistentHashAlgorithm;
@Override
public String doSharding(Collection<String> collection, PreciseShardingValue<Long> preciseShardingValue) {
if (consistentHashAlgorithm == null) {
consistentHashAlgorithm = new ConsistentHashAlgorithm(collection);
}
return consistentHashAlgorithm.getTableNode(String.valueOf(preciseShardingValue.getValue()));
}
}
sharding-jdbc 配置:
# 一致性hash 分表测试
pick_task:
actual-data-nodes: ds0.pick_task_$->{0..4}
tableStrategy:
standard:
shardingColumn: place_id
precise-algorithm-class-name: com.simplezero.coding.sharding.repository.sharding.ConsistentShardingAlgorithm
可以看到:
节点[pick_task_1]被添加, hash值为1045769218
节点[pick_task_3]被添加, hash值为1077989284
节点[pick_task_0]被添加, hash值为1225972537
节点[pick_task_2]被添加, hash值为1999305687
如果增加节点 pick_task_4,此时根据 一致性哈希算法,只有 pick_task_2 表数据需要迁移:
节点[pick_task_1]被添加, hash值为1045769218
节点[pick_task_3]被添加, hash值为1077989284
节点[pick_task_0]被添加, hash值为1225972537
节点[pick_task_4]被添加, hash值为1959358845
节点[pick_task_2]被添加, hash值为1999305687
5.2. 美团一致性哈希算法
直接分32库32表,通过 userId 后四位 % 32 分库 userId 后四位 / 32 %32 分表,共计分为1024张表。线上部署情况为 8 个集群(主从),每个集群 4 个库。
场景一:数据库性能达到瓶颈
将逻辑数据库升级到 --> 物理数据库 --> 数据库集群,分库分表规则不变,最多可以直接扩展到 32个数据库集群。
如果 32 个集群也无法满足需求,那么将分库分表规则调整为 (32*)*(32/),可以达到最多1024 个集群,即每张表一个集群。
场景二:单表容量达到瓶颈(或者 1024 已经无法满足你)
如果 1024 张表都不能满足你了,这时,可以保持分库规则不变,单库里的表再进行裂变:
tb_(userId后四位 div 32 mod 32)_(userId后四位 div 32 mod 32 mod 8) ==> db_4.tb_3_7
userId 后四位 div 32 mod 32 在目前订单这种规则下(用 userId 后四位 mod)还是有极限的,因为只有四位,所以最多拆 8192 个表。不过这个时候就不能一劳永逸,需要进行库内的表数据迁移了。
5.3. 平均分布方案
直接 32 库 32 表 1024 张表,固然可以一步到位,但是对于小公司来说前期根本用不上,浪费机器且增加系统复杂度,所以还是循序渐进,按照一致性哈希算法的思想,先确定总的节点数为32。
32 = 节点count * 数据库数量
database_index = key % 32 / count * count
//分2库 count = 32 / 2 = 16
database_index = key % 32 / 16 * 16 ==> db_0(0-15) db_16(16-31)
//分4库 count = 32 / 4 = 8
database_index = key % 32 / 8 * 8 ==> db_0(0-7) db_8(8-15) db_16(16-23) db_24(24-31)
可以看到如果从 2 库扩展到 4 库,需要从 db_0 移动一半的数据到 db_8,db_16 也同样要移动。
db_0(0-15)==> db_0(0-7)+ db_8(8-15)
可以看到当需要进行扩容一倍时需要迁移一半的数据量,如果量比较大,影响还是比较大。
这个方案还可以进行优化,一致性哈希的算法不变,每次增加 1 个节点,而不是每次增加 个节点,这样可以尽量平滑的进行扩容。
// 分2库 count = 32 / 2 = 16
database_index = key % 32 / 16 * 16 ==> db_0 db_16
// 分3库 count = 32 / 4 = 8
database_index = key % 32 / 8 * 8 ==> db_0(0-7) db_8(8-15) db_16(16-31)
此时就需要改分片路由的规则了:
Integer dbValue = userId % 32 / 8 * 8;
if (dbValue >= 16) {
return 16;
} else {
return dbValue;
}