ConCurrentHashMap在JDK7之前是ReentrantLock+Segment+HashEntry,在JDK8及之后是synchronized+CAS+Node+红黑树。
JDK7之前
对于JDK7的版本实现,ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个Segment结构,一个Segment其实就是一个类Hash Table ,Segment内部维护一个链表数组。
我们用下面一幅图看ConcurrentHashMap的内部结构从下面就可以看到ConcurrentHashMap定位一个元素需要进行两次Hash操作,第一次Hash定位到Segment,第二次定位到元素所在的链表的头部,因此这一种结构带来的副作用就是Hash的过程要比HashMap长。好处是操作的时候只需要对Segment进行操作即可,不会影响其他的Segment,这样在最理想的情况下ConcurrentHashMap可以最高同时支持Segment数量大小的写操作。所以,通过这一种结构,ConcurrentHashMap 的并发能力可以大大的提高。我们用下面这一幅图来看下ConcurrentHashMap的内部结构详情图,
为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使concurrentHashmap。
JAVA7之前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操作时,将该Segment锁定,不允许对其进行非查询操作,而在JAVA8之后采用CAS无锁算法,这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化.
JDK8之后
结构:数组+链表+红黑树。
重要属性和内部类
// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}
重要方法
// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
构造器分析
可以看出来实现类懒惰性加载,在构造方法中仅仅计算了table的大小,以后在第一次使用时才会真正的创建
/**
*loadFactor 负载因子0.75
*concurrencyLevel 并发线程数
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ...
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
Put操作
- 如果没有初始化就先调用initTable()方法来进行初始化过程
- 如果没有hash冲突就直接CAS插入
- 如果还在进行扩容操作就先进行扩容
- 如果存在hash冲突,就加synchronized锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
- 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
- 如果添加成功就调用addCount方法统计size,并且检查是否需要扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap不支持键或值null
if (key == null || value == null) throw new NullPointerException();
//将key的hash值设置为正数,负数有别的作用
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果tab数组不存在,或长度为0,就初始化table数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果数组的某个槽位上的节点为null,就new一个节点然后cas替换
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果槽位的头节点的hash值是Move是,就说明这个table在扩容,就帮助它扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//锁住头节点,对该节点下进行添加
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果binCount>= 一定的值就开始转换为tree
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
Get操作
注意:链表长度大于8&&table的容量大于64就转换为红黑树
- 计算hash值,定位到该table索引位置,如果是首节点符合就返回
- 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
- 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
保证key的hash码是一个正整数,负数有特殊用途
int h = spread(key.hashCode());
//tab不为null,长度不等于0,那个节点槽位不为null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果头节点==h,说明就是头节点符合
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果头节点hash是负数表示该bin在扩容forwordNode,或是treebin,这时候调用find方法查找
//如果是forNode就去新的table中去寻找,treebin就进行红黑树的遍历
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//继续在链表的情况下查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
思考
想一想我们可以使用CocurrentHashMap来代替Hashtable吗?
因为HashTable是Synchronized的,但是ConcurrentHashMap同步性能更好,因为它仅仅根据同步级别对map的一部分进行上锁,ConcurrentHashMap当然可以代替HashTable,但是HashTable提供更强的线程安全性,他们都可以用于多线程的环境,但是HashTable增加到一定的时候,性能就会急剧下降因为迭代时需要被锁定很长的时间。
ConcurrentHashMap引入了分割(segmentation),不论它变得多么大,仅仅需要锁定map的某个部分,而其它的线程不需要等到迭代完成才能访问map。简而言之,在迭代的过程中,ConcurrentHashMap仅仅锁定map的某个部分,而Hashtable则会锁定整个map。
ConcurrentHashMap有什么缺陷吗?
ConcurrentHashMap是非阻塞的,更新的时候会锁住局部部分数据。但不会把整个表都锁住。同步操作则是完全非阻塞的,好处是在保证合理的同步前提下,效率很高。
坏处是严格来书读取操作不能反映最近的更新。弱一致性
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历,
但ConcurrentHashMap不会失败,所以读取数据不能反映最新的更新。