1.addCount
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private final void addCount(long x, int check) {
CounterCell[] as;
long b, s;
//1.counterCells不为null
//2.或者 x加到baseCount失败
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a;
long v;
int m;
boolean uncontended = true;
//as为null
//as长度小于1
//as[ThreadLocalRandom.getProbe() & m]为null
//as[ThreadLocalRandom.getProbe() & m].value+x失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
//能走到这里,表明as不为null,as[ThreadLocalRandom.getProbe() & m].value+x成功
if (check <= 1){
return;
}
//计算个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt;
int n, sc;
//1.s>=sc (数组元素个数大于或者等于扩容阈值)
//2.table不为null
//3.table的length小于MAXIMUM_CAPACITY
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) { //sc小于0,说明有线程正在扩容,那么会协助扩容
//扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶
//位需要转移,结束操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0){
break;
}
//扩容线程加1,成功后,进行协助扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)){
transfer(tab, nt);
}
} else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)){
//扩容
transfer(tab, null);
}
s = sumCount();
}
}
}
}
2.fullAddCount
初始化 counterCells,初始容量是2,将x放到要放入的位置;
如果没有冲突,放入该桶;
如果要放入的桶有冲突,重新生成hash,再看有没有冲突;
如果重新生成hash后,还是有冲突,将x增加到该桶的value;
如果有冲突,数组容量没有超过cpu核数,对数组进行扩容,新容量为老容量的2倍;
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {//用来获得随机数
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as;
CounterCell a;
int n;
long v;
//数组不为空,优先对数组中CouterCell的value累加
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {//应该放入的桶为null
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs;
int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;//放入桶
created = true;
}
} finally {
cellsBusy = 0;
}
if (created){ //放入成功
break;
}
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended){ // CAS already known to fail
wasUncontended = true; // Continue after rehash
}
//x添加到a.value
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)){
break;
} else if (counterCells != as || n >= NCPU){
collide = false; // At max size or stale
} else if (!collide){
collide = true;
} else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
//数组扩容
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i){
rs[i] = as[i];
}
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);//重新计算hash
}
//CounterCell数组为空,并且没有线程在创建数组,修改标记,并创建数组
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
//创建数组,容量为2
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);//这次的x保存至rs[h & 1]
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init){
break;
}
}
//数组为空,并且有别的线程在创建数组,那么尝试对baseCount做累加,
//成功就退出循环,失败就继续循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)){
break; // Fall back on using base
}
}
}
}