文章目录
- 前言
- 一、分段扩容
- 1、addCount
- 2、transfer
- 3、helpTransfer
- 二、查询
- 二、删除
- 总结
前言
主要记录ConcurrentHashMap(笔记中简称CHM)的查询,删除,以及扩容方法的关键源码分析。
一、分段扩容
1、addCount
扩容的逻辑主要在addCount
方法的后半段:
private final void addCount(long x, int check) {
//....前半段是进行桶数组中所有元素的累加计数
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//满足条件就会一直循环:桶数组中所有元素的累加计数>=扩容阈值 并且 桶数组不为空 并且 桶数组的长度<最大长度
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//通过对数组长度n进行计算得到的标记值
int rs = resizeStamp(n);
//sc为负数证明在哈希表正在扩容中(存在并发扩容的情况)
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//尝试将SIZECTL的值 + 1 代表有1个线程在协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//否则通过CAS的操作尝试将SIZECTL设置成一个负数标记
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//真正扩容的逻辑
transfer(tab, null);
//重新计算旧桶数组中所有元素的数量
s = sumCount();
}
}
}
其中resizeStamp(n)
,假设目前桶数组的length是16
,那么计算出的值是:32795
int rs = Integer.numberOfLeadingZeros(16) | (1 << (16 - 1));
System.out.println("rs = " + rs);
rs << RESIZE_STAMP_SHIFT) + 2
计算出的是-2145714174
。将SIZECTL
设置为负数说明已经有一个线程去执行了扩容。
2、transfer
transfer
是真正执行扩容的逻辑:
//tab:旧表 nextTab:新表
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n 旧表的长度 stride 每个线程迁移的步长
int n = tab.length, stride;
//计算步长,最小为16 即每个线程最低负责处理16个桶数据的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//新表还没有创建
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//创建新表,长度为旧表的2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
//将新表赋值给nextTab 变量(注意,此时还没有真正把新表指向CHM的nextTab 属性)
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//这一步才是将新表指向CHM的nextTable 属性
nextTable = nextTab;
//旧表的长度作为迁移的索引
transferIndex = n;
}
//nextn 新表的长度
int nextn = nextTab.length;
//将新表包装成ForwardingNode对象。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//前进标记
boolean advance = true;
//结束标记
boolean finishing = false; // to ensure sweep before committing nextTab
到上面为止,只是初始化一些变量,以及在新表为空的情况下,创建出了新表,并赋值给了CHM的nextTable
属性, 将旧表的长度赋值给CHM的transferIndex
属性,以及将新表包装成ForwardingNode
对象,ForwardingNode
的hash值是-1。
//无限循环
for (int i = 0, bound = 0;;) {
//f 是当前桶中的节点,fh 是节点的哈希值
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
//先将属性transferIndex赋值局部变量nextIndex。给如果 transferIndex 小于等于 0,表示没有任务可做,退出迁移过程。
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//通过CAS保证不会两个线程竞争到同一个区域
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
对于上面这一段的逻辑,可以画图表示,假设旧表的长度为4,每次迁移的步长为2:
在else if ((nextIndex = transferIndex) <= 0)
这一步,将nextIndex 赋值为了4:
在nextBound = (nextIndex > stride ? nextIndex - stride : 0))
这一步,计算出nextBound = 4 - 2 = 2,并且将TRANSFERINDEX
通过CAS改成了2
最后经过bound = nextBound;
和 i = nextIndex - 1;
计算出:
然后将advance = false
,跳出while循环。
上面的过程,假设又有另一个线程
进入了while循环,在else if ((nextIndex = transferIndex) <= 0)
这一步,将nextIndex 赋值为了2(第一个线程通过CAS修改的):
然后在nextBound = (nextIndex > stride ? nextIndex - stride : 0))
这一步,计算出nextBound = 0,并且将TRANSFERINDEX
通过CAS改成了0:
最后经过bound = nextBound;
和 i = nextIndex - 1;
计算出:
也就是线程一负责2,3桶的迁移,线程二负责0,1桶的迁移。当然在不存在多线程并发的情况下,某个线程在迁移完了2,3桶后,依旧会判断是否要前进进行0,1桶的迁移。
//i相比较于数组发生越界,当前线程已经将自己的区域转移完毕
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//结束标记为true时,这里不能仅仅根据当前线程判断结束标记,需要所有参与迁移的线程全部结束
if (finishing) {
//将CHM的临时新表指向null
nextTable = null;
//将CHM的table属性指向新表
table = nextTab;
//将CHM的sizeCtl 属性设置为 扩容阈值(计算是按照0.75的扩容因子进行的)
//n:32时 经过计算是 24
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//尝试CAS将SIZECTL - 1 标记当前线程已经完成了迁移。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//仍然有其他线程未释放标记
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有线程都迁移完成
finishing = advance = true;
i = n; // recheck before commit
}
}
当某个线程在CAS成功,第一次
进入transfer
时,SIZECTL会被更改成一个负数
,其他的线程需要帮助扩容,则会对SIZECTL + 1:
SIZECTL(sc)的初始值是 (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
:
而上面的代码片段中,if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
可以转换为 sc != resizeStamp(n) << RESIZE_STAMP_SHIFT) +2
,代表依旧存在其他线程没有完成迁移,否则sc会复位成最初的 (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
。
//某个桶的头节点为空
else if ((f = tabAt(tab, i)) == null)
//就尝试通过CAS将null值替换为fwd节点
advance = casTabAt(tab, i, null, fwd);
//某个桶的头节点的hash值为-1 证明已经转换成了fwd节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//对桶的头节点加锁,真正根据不同情况迁移的逻辑
synchronized (f) {
//再次判断某个桶的头节点有没有发生改变,链表的迁移
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//红黑树的迁移
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
上面的代码片段,主要是迁移元素的逻辑,也是分为链表和红黑树,并且某个桶在迁移完成后,会标记为为fwd节点
。其他线程在put时,发现当前桶的下标为fwd节点
,则会走帮助扩容的逻辑:
链表的迁移,主要做了以下四步,和HashMap根据高低位进行分组是同样的思路:
- 遍历链表中的节点,找出链表中哈希值相同的分组。
- 根据分组将节点分别插入到两个新的链表中。
- 将这两个新链表插入到新的哈希表的相应位置。
- 将原来的链表标记为已处理。
int runBit = fh & n;
是用当前节点的hash值去 & 旧表的长度,结果只有可能是0或者旧表的长度,例如:
System.out.println(2365651 & 4);
System.out.println(1926817 & 4);
System.out.println(1848602 & 4);
System.out.println(4731302 & 4);
3、helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//会再次判断传入的tab 是否为空 以及桶下标位置的头节点是否为fwd 以及 fwd对象中的nextTable 是否为空
//防止在帮助扩容的过程中,新表已经迁移完成并且赋值给了CHM的table属性
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//根据旧表的长度计算得到一个标记
int rs = resizeStamp(tab.length);
//sizeCtl < 0 代表正在有其他线程在扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//扩容已经结束
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//尝试将SIZECTL + 1 标记当前线程正在参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
//返回临时的新表
return nextTab;
}
//返回新表
return table;
}
二、查询
这一段查询的逻辑,和HashMap类似,区别在于加入了对于fwd节点
的判断,如果当前的节点是fwd节点,就去临时的新表中查询。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
//再次判断桶数组是否不为空,以及根据hash算出的桶的头节点是否不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//查找的节点是根据hash算出的桶的头节
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
//直接返回值
return e.val;
}
//hash值小于0 是fwd节点 或者红黑树
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//处理链表的查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//没有找到就返回空值
return null;
}
二、删除
public V remove(Object key) {
return replaceNode(key, null, null);
}
删除的逻辑,也加入了判断,如果当前要删除的桶的头节点是MOVED
,则和新增元素一样,去走帮助扩容的逻辑,并且删除也是对单个桶去加锁的,其他的逻辑和HashMap大致相似,也是分为链表和红黑树,先遍历找出需要删除的元素,再执行删除逻辑:
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
总结
在CHM的新增/删除时,如果发现目标节点的状态是MOVED
,就说明旧表数据正在被迁移,于是当前线程会走帮助扩容的逻辑。CHM的扩容迁移,也是分段进行的,每个线程会计算步长
(最小为16),也就是至少负责对16个桶元素的迁移。
如果是单线程的情况下,当前线程在处理完16个桶后,会继续处理后续的16个桶。如果是多线程的场景下,通过CAS
保证不会有多个线程同时处理相同步长范围内的桶。并且链表/红黑树迁移的过程,也是和HashMap的类似,通过节点的hash值 & 旧表长度计算出高位
和低位
进行分组迁移
,当某个桶下标迁移完成后,会将当前位置标记为fwd节点
。 最后需要等到所有的线程都迁移完成后,才会真正将新表的引用指向CHM的table属性。
下一篇:集合类源码浅析のJDK1.8 HashMap中的红黑树