本文已收录于:https://github.com/danmuking/all-in-one(持续更新)
前言
哈喽,大家好,我是 DanMu。这篇文章想和大家聊聊 ConcurrentHashMap 相关的知识点。严格来说,ConcurrentHashMap 属于java.lang.current
,并属于通常意义上的容器,但是考虑到面试官非常喜欢打出 HashMap 和 ConcurrentHashMap 这套丝滑小连招,因此我决定将它们放在一起,来加深大家的印象,发车!
数据结构
与 HashMap 一样,ConcurrentHashMap 的底层结构在JDK1.8的时候出现过一次比较大的变化,这篇文章将以JDK1.8的数据结构为主,对于1.8前的结构,大家只需要大致了解即可(JDK版本都已经更新到21了,让我看看哪个活化石面试官还问老版的数据结构)
JDK1.8之前
JDK1.8
在JDK1.8之前,ConcurrentHashMap 采用 **Segment 数组 + HashEntry 数组 + 链表 **的结构,因此其最大并发数会收到 Segment 数量的限制。在 JDK 1.8中,对 ConcurrentHashMap 主要进行了两点改进:
- 细化锁的粒度,从针对 Segment 加锁修改为针对 每个数组元素单独加锁,
- 与 HashMap 一样,将 链表 的结构修改为 链表/红黑树 的结构,提高了搜索链表的效率
对ConcurrentHashMap的数据结构有了一个基础的认识之后,接下来就准备深入底层,彻彻底底的搞懂 ConcurrentHashMap 的实现原理。
ConcurrentHashMap 的源码相对于 HashMap 复杂了很多,希望大家能静下心来慢慢看,我会尽力帮助大家简单的理解源码中的思想。
核心源码详解
重要的成员变量
老规矩,还是先来看看 ConcurrentHashMap 中维护了哪些重要的成员属性:
// ConcurrentHashMap 中存放元素的最大值
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认的初始化大小
private static final int DEFAULT_CAPACITY = 16;
// 从链表转换为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 从红黑树转换为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 开启红黑树转换的数组阈值
// 只有同时满足 TREEIFY_THRESHOLD 和 MIN_TREEIFY_CAPACITY 才会转换为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;
// 接下来几个都是标志位,用来表示扩容过程中的状态,在后面会详细解释
static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
// 实际存放元素的数组
transient volatile Node<K,V>[] table;
// 在扩容时会用到的数组
private transient volatile Node<K,V>[] nextTable;
/**
* 用来控制表初始化和扩容的
* 默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,先有个印象
* -1 说明正在初始化
-N 说明有 N-1 个线程正在进行扩容
0 默认状态,表明table没有初始化
>0 表示 table 扩容的阈值,如果 table 已经初始化。
*/
private transient volatile int sizeCtl;
// 扩容时使用,用来表示扩容进行到哪里
private transient volatile int transferIndex;
初始化
接下来,来看看 ConcurrentHashMap 在初始化的时候做了什么
// 默认的初始化函数
// 可以看到默认的初始化函数啥都没干,所以实际分配空间也是推迟到第一次添加元素的时候
// 这里也是采用的懒加载
public ConcurrentHashMap() {
}
// 指定初始化容量的初始化函数
public ConcurrentHashMap(int initialCapacity) {
// 这边先处理了一下两个边界情况
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// tableSizeFor 的作用是找到最接近 initialCapacity 的2的次幂
// 在扩容的时候也是扩容为原容量的2倍
// 因此 ConcurrentHashMap 的容量永远是2的幂次
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 因为 ConcurrentHashMap 使用了懒加载,因此实际分配空间将会推迟到第一次添加函数时
// 这里为了内容的连贯性,将它们放在一起
// initTable 初始化空间
private final Node<K,V>[] initTable() {
// 用来存放 table 成员变量
Node<K,V>[] tab;
// 用来存放 sizeCtl 成员变量
int sc;
// 判断空间初始化了没有
// 注意:这里可能有多个线程想进行初始化,因此这里用了一个循环
// 只有 cas 成功的线程才能进行初始化
// 其他 cas 失败的线程会进入自旋,找到 ConcurrentHashMap 初始化完成,然后退出。
while ((tab = table) == null || tab.length == 0) {
// cas 失败的线程会走到这个条件
// 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。
// 失败的线程释放 CPU,或者在 while 里自旋
if ((sc = sizeCtl) < 0)
// 让出 CPU 使用权
Thread.yield(); // lost initialization race; just spin
// cas 修改 sc 的值为 -1,只有一个线程能够成功,然后开始初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 上面说了,如果是带容量的初始化函数会将容量暂存在 sc 里
// 所以如果 sc > 0,就是走带容量的流程,否则就是默认初始化容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 给成员变量赋值
table = tab = nt;
// 给 sc 赋值,现在 sc 存放的是扩容阈值
// sc为0.75数组大小
sc = n - (n >>> 2);
}
} finally {
// 赋值给对应的成员变量
sizeCtl = sc;
}
break;
}
}
return tab;
}
添加元素
下面,来看看如何向 ConcurrentHashMap 中添加元素。
要向 ConcurrentHashMap 中添加元素可分为这几个步骤:
- 计算 key 对应的 hash 值,根据 hash 定位到对应的数组下标
- 先快速判断一下对应下标是否为空,如果为空,直接使用 cas 将元素写入
- 否则的话就需要遍历对应数组位置的链表/红黑树,为保证遍历过程中数据不发生变化,需要先给对应数组位置加锁。
- 如果在遍历过程中遇到相同的 key 就直接覆盖,否则就添加到尾部。
大致理解一下流程应该能帮助大家更好的理解源码,接下来直接上硬菜:
// 向 ConcurrentHashMap 中添加元素
public V put(K key, V value) {
return putVal(key, value, false);
}
// 添加元素中实际调用的函数
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 边界条件判断,ConcurrentHashMap 是不允许添加 null 的
if (key == null || value == null) throw new NullPointerException();
// 计算 hash
int hash = spread(key.hashCode());
// 用于记录相应链表的长度
int binCount = 0;
// 先把table的值存到tab里
// 这里加了一个循环,和上面初始化时候的循环作用类似
for (Node<K,V>[] tab = table;;) {
// f 用来存放 key 对应数组位置的第一个节点
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 上面已经说过了
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,
// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 居然可以等于 MOVED,这个可以先放着,涉及到后面扩容的知识
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
tab = helpTransfer(tab, f);
// 到这里就是说,hash 对应的数组位置已经有值了,需要遍历链表/红黑树
else {
V oldVal = null;
// 对该数组位置加锁
// 接下来就是遍历了
// 如果有相同的 key 就覆盖,否则添加到尾部
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头节点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树的遍历,这里可以不用看了,很复杂,面试也不大可能会考
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
// 具体源码我们就不看了,扩容部分后面说
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
获取元素
获取元素的逻辑和添加元素差不多,这里就不在赘述了:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算 hash 值
int h = spread(key.hashCode());
// 根据 hash 值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果搜索到的节点 key 与传入的 key 相同且不为 null,直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 这边涉及到一些后面扩容的知识,可以先不看,等看完后面扩容的知识在来看就懂了
// 如果 eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了
// 所以这个 find 方法是树节点的 find 方法或者是 fwd 节点的 find 方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
// 这是 ForwardingNode 节点的 find 方法
Node<K,V> find(int h, Object k) {
// 注意这里的nextTable 是扩容时传进来的
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 没找到直接返回 null
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
// 自旋
for (;;) {
int eh; K ek;
// 找到了就返回节点
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// 同样要判断是树节点还是 ForwardingNode 节点
if (eh < 0) {
// ForwardingNode 节点就继续往里找
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
// 树节点 就调用数节点的 find 方法
return e.find(h, k);
}
// 没找到就返回n ull
if ((e = e.next) == null)
return null;
}
}
}
扩容
在 ConcurrentHashMap 中还有一个非常重要的知识点,就是 ConcurrentHashMap 的扩容,与 HashMap 的库容不同,由于 ConcurrentHashMap 需要再扩容的过程中不仅要有高性能,还要保证线程的安全,因此 ConcurrentHashMap 的扩容过程可以说是相当的复杂,希望大家能够耐下心来慢慢慢看,我也会尽力帮大家理解 ConcurrentHashMap 的扩容思想。
总体来说,ConcurrentHashMap 的**核心思想就是:分而治之。**在扩容过程中,ConcurrentHashMap 将数组划分成若干个不重合的任务包,并将任务包分配给多个线程,来达到提高性能的同时保证线程安全的目的。
但是体现在源码中,可不像说起来这么简单。来,上硬菜:
// tryPresize 会在将链表转换为红黑树的 treeifyBin 中调用
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
// 也就是说,如果当前 ConcurrentHashMap 的数组长度为8,那么 size 就是 16
private final void tryPresize(int size) {
// c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
// 注意,c 不表示扩容后的数组大小
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
// 这个分支负责处理一些边界情况
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 扩容戳,为一个负值
// 高16为为扩容标记
// 低16为为扩容线程数+1,从2开始是因为1已经作为初始化标记
// 这个函数会在后面详细分析
int rs = resizeStamp(n);
// 这个分支表示当前已经开始扩容,用来帮助扩容
if (sc < 0) {
Node<K,V>[] nt;
// 这个分支用来处理边界,只要满足一个条件就直接退出,不帮助扩容
// RESIZE_STAMP_SHIFT 表示左移16位,用来比较扩容标记是否相同
// 这个条件有问题,不会走到
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
// 最后一个扩容进程在执行最后一个任务
sc == rs + 1 ||
// 帮助扩容的线程数达到上线
sc == rs + MAX_RESIZERS ||
// 新数组还没有初始化完成
(nt = nextTable) == null ||
// 所有的任务包都已经分发完
transferIndex <= 0)
break;
// cas 将 sc+1 表示扩容进程数+1,然后开始帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 第一个开始扩容的线程,cas 将 sc 赋值为 rs+2 开始,开始扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
从上面可以看到,具体实现扩容的逻辑都通过调用 transfer 来实现,接下来就来看看 transfer 函数:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n 是原数组的长度
int n = tab.length, stride;
// 前面说了,扩容的核心思想就分而治之,将数组划分为多个任务包,stride就是每个任务包的大小
// stride 在单核下直接等于 n,就等于只有一个任务包
// 多核模式下为 (n>>>3)/NCPU,最小值是 16
// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 前面我们说了,外围会保证第一个发起扩容的线程调用此方法时,参数 nextTab 为 null
// 之后帮助扩容的线程调用此方法时,nextTab 不会为 null
if (nextTab == null) {
try {
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable 是 ConcurrentHashMap 中的属性
nextTable = nextTab;
// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode 翻译过来就是正在被迁移的 Node
// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
// 所以它其实相当于是一个标志。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
// 初始化为 true
boolean advance = true;
// 迁移工作是否全部完成
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
*
*/
// i 是位置索引,bound 是边界,注意是从后往前
// 这里可以理解为初始化,并不是要使用的值
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance 为 true 表示可以进行下一个位置的迁移了
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 第一次会走这个分支,将 transferIndex 赋值为 nextIndex-stride
// 第一个进程就负责处理 nextIndex-stride 到 nextIndex 这一个任务包
// 任务包会被从后往前处理
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
bound = nextBound;
i = nextIndex - 1;
// 刚开始当前任务肯定没完成
advance = false;
}
}
// 所有任务完成
if (i < 0 ||
// 不应该发生
i >= n ||
// 不应该发生
i + n >= nextn) {
int sc;
// 扩容完成
if (finishing) {
// 所有的迁移操作已经完成
nextTable = null;
// 将新的 nextTab 赋值给 table 属性,完成迁移
table = nextTab;
// 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
finishing = advance = true;
i = n; // recheck before commit
}
}
// 下面几个分支就是进行迁移的代码了,比较简单
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 头节点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0) {
// 需要将链表一分为二,
// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
// lastRun 之前的节点需要进行克隆,然后分到两个链表中
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
// 红黑树可以不看
else if (f instanceof TreeBin) {
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果一分为二后,节点数小于等于6,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将 ln 放置在新数组的位置 i
setTabAt(nextTab, i, ln);
// 将 hn 放置在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
}
}
}
}
}
恭喜你,看到这里你已经对于扩容有了相当深入的理解!扩容的代码确实相当的硬核,需要大家有一点耐心,慢慢理解。
答疑
在源码中还有一些用一句两句话很难解释清楚的问题,就在这部分进行更详细的解读
扩容戳有什么用
先来填个前面留下的坑
resizeStamp 会生成一个扩容戳,要理解
要理解扩容戳,必须从二进制的角度来分析。resizeStamp 方法就一句话:
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
其中 RESIZE_STAMP_BITS
是一个默认值 16。这里面关键就是 Integer.numberOfLeadingZeros(n) 这个方法,这个方法源码就不贴出来了,实际上这个方法就是做一件事,那就是获取当前数据转成二进制后的最高位的 1 前的 0 的个数。这句话有点拗口,举个例子:
以 16 为准,16 转成二进制是10000
,最高非 0 位是在第 5 位,因为 int 类型是 32 位,所以他前面还有 27 位,而且都是 0,那么这个方法得到的结果就是 27(1 的前面还有 27 个 0)。
然后1 << (RESIZE_STAMP_BITS - 1)
在当前版本就是1<<15
,也就是得到一个二进制数1000 0000 0000 0000
,这里也是要做一件事,把这个 1 移动到第 16 位。最后这两个数通过|
操作一定得到的结果就是第 16 位是 1,因为 int 是 32 位,最多也就是 32 个 0,而且因为 n 的默认大小是 16(ConcurrentHashMap 默认大小),所以实际上最多也就是 27(11011)个 0,也就是说这个数最高位的 1 也只是在第五位,执行|
运算最多也就是影响低 5 位的结果。扩容戳在被使用的时候会先左移 16 位,然后赋值给 sc,所以这边计算出的扩容戳,其实是高 16 位。上面说的高 16 位代表当前扩容的标记,低 16 位代表了扩容的线程数的扩容戳实际上指的是 rs 赋值后的 sizeCtl。
注意:这里之所以要保证第 16 位为 1,是为了保证 sizeCtl 变量为负数,扩容戳在被使用时会先左移 16 位,这样最高位永远为 1 保证 sizeCtl 变量为负数。
首次扩容为什么计数要 +2 而不是 +1
我们上面已经说了扩容戳的作用:
- 高 16 位代表当前扩容的标记
- 低 16 位代表了扩容的线程数。
知道了这两个条件就好理解了,因为 rs 最终是要赋值给 sizeCtl 的,而 sizeCtl 负数才代表扩容,而将 rs 左移 16 位就刚好使得最高位为 1,此时低 16 位全部是 0,而因为低 16 位要记录扩容线程数,所以应该 +1,但是这里是 +2,原因是 sizeCtl 中 -1 这个数值已经被使用了,用来代替当前有线程准备扩容,所以如果直接 +1 是会和标志位发生冲突。为了避开标志位,所以初始化第一次记录扩容线程数的时候才需要 +2。
点关注,不迷路
好了,以上就是这篇文章的全部内容了,如果你能看到这里,非常感谢你的支持!
如果你觉得这篇文章写的还不错, 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!!
白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !
最后推荐我的IM项目DiTing(https://github.com/danmuking/DiTing-Go),致力于成为一个初学者友好、易于上手的 IM 解决方案,希望能给你的学习、面试带来一点帮助,如果人才你喜欢,给个Star⭐叭!
参考资料
助力面试之ConcurrentHashMap面试灵魂拷问,你能扛多久
JUC集合: ConcurrentHashMap详解