JUC并发—8.并发安全集合一

大纲

1.JDK 1.7的HashMap的死循环与数据丢失

2.ConcurrentHashMap的并发安全

3.ConcurrentHashMap的设计介绍

4.ConcurrentHashMap的put操作流程

5.ConcurrentHashMap的Node数组初始化

6.ConcurrentHashMap对Hash冲突的处理

7.ConcurrentHashMap的并发扩容机制

8.ConcurrentHashMap的分段锁统计元素数据

9.ConcurrentHashMap的查询操作是否涉及锁

10.ConcurrentHashMap中红黑树的使用

1.JDK 1.7的HashMap的死循环与数据丢失

(1)JDK 1.7的HashMap工作原理

(2)JDK 1.7的HashMap并发下导致的环形链表

(3)环形链表引发的死循环与数据丢失

(4)JDK 1.7和JDK 1.8的HashMap对比

(5)并发安全的集合

(1)JDK 1.7的HashMap工作原理

一.Hash算法

put(key, value) => 对key执行Hash算法 => 根据Hash值用类似取模的算法 => 定位数组的某一个元素 => 如果数组元素为空,则将value存放在该数组元素里。

二.Hash冲突

如果两个key的Hash值,经过取模算法定位到数组的同一个位置,此时就会用链表处理这种Hash冲突。

三.数组扩容

如果数组元素达到了:数组大小 * loadFactor(0.75),此时就会数组扩容。扩容时会按照倍数扩容,首先创建一个两倍大小的新数组。然后遍历原来的数组元素,对每个元素的key值进行Hash运算。接着将Hash运算后的Hash值对新数组大小进行取模,定位到新数组位置。

(2)JDK 1.7的HashMap并发下导致的环形链表

多线程并发操作HashMap时,可能会在扩容过程中形成一个环形链表。比如两个线程同时插入一个元素,而此时恰好两个线程同时触发了数组扩容。那么在数组扩容的过程中,就可能会形成一个环形链表。

下面是JDK1.7中HashMap扩容的核心源码。进行数组扩容时,会使用头插法来进行链表迁移。如果并发执行的两个线程同时使用头插法进行链表迁移,那么就有可能形成一个环形链表。

//JDK1.7的HashMap扩容的核心方法
void transfer(Entry[] newTable) {
    Entry[] src = table;//旧的数组
    int newCapacity = newTable.length;
    for (int j = 0; j < src.length; j++) {
        Entry<K,V> e = src[j];
        if (e != null) {
            src[j] = null;
            do {
                Entry<K,V> next = e.next;
                //线程1执行到这里,假设此时的链表为:newTable[i] = <k1,v1> -> <k2,v2>
                //那么可知:e = <k1,v1>,next = <k2,v2>
                //恰好此时CPU发生了上下文切换,于是切换到线程2去执行扩容
                //线程2扩容时处理完链表的这两个节点后,newTable[i]就变成了:<k2,v2> -> <k1,v1>
                //然后CPU又切换回线程1来执行,由于此时e = <k1,v1>,那么后续代码对e.next赋值后,e就成为环形链表了:
                //也就是e = <k1,v1> -> <k2,v2> -> <k1,v1>,最后e又赋值给newTable[i]
                int i = indexFor(e.hash, newCapacity);//在新数组的位置
                //头插法:刚开始newTable[i]为null,后来newTable[i]变为<k1,v1>;
                //然后当e=<k2,v2>时,这里e设为<k2,v2> -> <k1,v1>,并又赋值给newTable[i]
                //接着遍历链表当e=<k3,v3>时,这里e设为<k3,v3> -> <k2,v2> -> <k1,v1> ...
                e.next = newTable[i];
                newTable[i] = e;
                e = next;
            } while (e != null);
        }
    }
}

(3)环形链表引发的死循环与数据丢失

一.环形链表导致死循环

假如执行get(k3)时,k3的Hash取模算法定位到环形链表的位置。于是开始遍历环形链表,但由于环形链表里没有k3的值,所以会导致在环形链表中无法找到k3对应的值进行返回。这样就导致了一直在环形链表中进行死循环,无法退出遍历。最后导致CPU飙升,线上系统被这个get操作卡死。

二.环形链表导致丢失数据

上面例子就导致了从出发是无法找到的,因此这条数据就永久丢失了,甚至会被垃圾回收掉。

(4)JDK 1.7和JDK 1.8的HashMap对比

在JDK1.7中,HashMap采用数组 + 链表的数据结构来存储数据。在多个线程并发扩容时,可能会造成环形链表最终导致死循环和数据丢失。

在JDK1.8中,HashMap采用数组 + 链表 + 红黑树的数据结构来存储数据,并且优化了JDK1.7中的数组扩容方案,解决了死循环和数据丢失的问题。但是在并发场景下调用put()方法时,有可能会存在数据覆盖的问题。

(5)并发安全的集合

比如HashTable使用synchronized来保证线程的安全性,比如Collections.synchronizedMap可以把一个线程不安全的Map,通过synchronized的方式,将其变成安全的。

但是这些方法在线程竞争激烈的情况下,效率都比较低。因为它们都是在方法层面上使用了synchronized实现的锁机制,从而导致不管是put操作还是get操作都需要去竞争同一把锁。

ConcurrentHashMap既能保证并发安全,性能也好于HashTable等集合。

2.ConcurrentHashMap的并发安全

(1)如何理解ConcurrentHashMap的并发安全

(2)ConcurrentHashMap在复合操作中的安全问题

(3)ConcurrentMap可解决复合操作的安全问题

(4)ConcurrentMap支持lambda表达式操作

(1)如何理解ConcurrentHashMap的并发安全

只能保证多线程并发执行时,容器中的数据不会被破坏。无法保证涉及多个线程的复合操作的正确性,复合操作会有并发安全问题。

(2)ConcurrentHashMap在复合操作中的安全问题

假设需要通过一个ConcurrentHashMap来记录每个用户的访问次数。如果指定用户已经有访问次数的记录,则进行递增,否则添加新访问记录。

如下代码在多线程并发调用时,会存在并发安全问题。虽然ConcurrentHashMap对于数据操作本身是安全的,但这里是复合操作,也就是"读—修改—写",而这三个操作作为一个整体却不是原子的。所以当多个线程访问同一个用户时,很可能会覆盖相互操作的结果,从而造成该用户的访问记录次数少于实际访问次数。

public class Demo {
    private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
    
		public static void main(String[] args) throws InterruptedException {
        Long accessCount = USER_ACCESS_COUNT.get("张三");
        if (accessCount == null) {
            USER_ACCESS_COUNT.put("张三", 1L);
        } else {
            USER_ACCESS_COUNT.put("张三", accessCount + 1);
        }
    }
}

(3)ConcurrentMap可解决复合操作的安全问题

虽然ConcurrentHashMap是并发安全的,但对于其复合操作需要特别关注。上述复合操作的安全问题的解决方案是,可以对复合操作加锁,也可以使用ConcurrentMap接口来解决复合操作的安全问题。

ConcurrentMap是一个支持并发访问的Map集合,相当于在原本的Map集合上新增了一些方法来扩展Map的功能。

ConcurrentMap接口定义的如下4个方法,都能满足原子性的,可以用在ConcurrentHashMap的复合操作场景中。

//A java.util.Map providing thread safety and atomicity guarantees.
public interface ConcurrentMap<K, V> extends Map<K, V> {
    ...
    //向ConcurrentHashMap集合插入数据
    //如果插入数据的key不存在于集合中,则保存当前数据并且返回null
    //如果key已经存在,则返回存在的key对应的value
    V putIfAbsent(K key, V value);
    
    //根据key和value来删除ConcurrentHashMap集合中的元素
    //该删除操作必须保证key和value完全匹配,删除成功则返回true,否则返回false
    boolean remove(Object key, Object value);
    
    //根据key和oldValue来替换ConcurrentHashMap中已经存在的值,新的值是newValue
    //该替换操作必须保证key和oldValue玩去匹配,替换成功则返回true,否则返回false
    boolean replace(K key, V oldValue, V newValue);
    
    //和replace(key, oldValue, newValue)不同之处在于,少了对oldValue的判断
    //如果替换成功,则返回替换之前的value,否则返回null
    V replace(K key, V value);
    ...
}

因此,可以基于ConcurrentMap提供的接口对上述Demo进行改造。将原来ConcurrentHashMap第一次的put()方法替换为putIfAbsent()方法,将原来ConcurrentHashMap修改用的put()方法替换为replace()方法。由于putIfAbsent()方法和replace()方法都能保证原子性,所以并发安全了。同时增加一个while(true)方法以实现一个类似自旋的操作,确保操作成功。

public class KeyUtil {
    private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            Long accessCount = USER_ACCESS_COUNT.get("张三");
            if (accessCount == null) {
                if (USER_ACCESS_COUNT.putIfAbsent("张三", 1L) == null) {
                    break;
                }
            } else {
                if (USER_ACCESS_COUNT.replace("张三", accessCount, accessCount + 1)) {
                    break;
                }
            }
        }
    }
}

(4)ConcurrentMap支持lambda表达式操作

一.computeIfAbsent()方法

该方法通过判断传入的key是否存在来对ConcurrentMap进行数据初始化。如果key存在,则不做任何处理。如果key不存在,则调用mappingFunction计算出value值添加到Map。

//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1
USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);

二.computeIfPresent()方法

该方法对已经存在的key对应的value值进行修改。如果key不存在,则返回null。如果key存在,则调用mappingFunction计算出value值修改Map。

//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:
USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);

三.compute()方法

compute()方法是computeIfAbsent()方法和computeIfPresent()方法的结合体。不管key是否存在,都会调用mappingFunction计算出value值。如果key存在,则对value进行修改。如果key不存在,则进行初始化处理。

//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1
USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);

//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:
USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);

//如果张三这个用户存在,则对其value加1,否则初始化其值为1
USER_ACCESS_COUNT.compute("张三", (k,v) -> (v == null) ? 1L : v + 1);

3.ConcurrentHashMap的设计介绍

(1)JDK1.8相比于JDK1.7的改进

(2)ConcurrentHashMap的设计思想

(3)ConcurrentHashMap的数据结构定义

(1)JDK1.8相比于JDK1.7的改进

一.取消了segment分段设计,直接使用Node数组来保存数据

采用Node数组元素作为锁的粒度,进一步减少并发冲突的范围和概率。

二.引入红黑树设计

红黑树降低了极端情况下查询某个结点数据的时间复杂度,从O(n)降低到了O(logn),提升了查找性能。

(2)ConcurrentHashMap的设计思想

一.通过对数组元素加锁来降低锁的粒度

二.多线程进行并发扩容

三.高低位迁移方法

四.链表转红黑树及红黑树转链表

五.分段锁来实现数据统计

(3)ConcurrentHashMap的数据结构定义

ConcurrentHashMap采用Node数组来存储数据,数组长度默认为16。Node表示数组中的一个具体的数据结点,并且实现了Map.Entry接口。Node的key和val属性,表示实际存储的key和value。Node的hash属性,表示当前key对应的hash值。Node的next属性,表示如果是链表结构,则指向下一个Node结点。

当链表长度大于等于8 + Node数组长度大于64时,链表会转为红黑树,红黑树的存储使用TreeNode来实现。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //The default initial table capacity.
    //Must be a power of 2 (i.e., at least 1) and at most MAXIMUM_CAPACITY.
    private static final int DEFAULT_CAPACITY = 16;
    
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;//用来存储ConcurrentHashMap数据的Node数组
    
    //Key-value entry.  
    //This class is never exported out as a user-mutable Map.Entry 
    //(i.e., one supporting setValue; see MapEntry below), 
    //but can be used for read-only traversals used in bulk tasks.
    //Subclasses of Node with a negative hash field are special, 
    //and contain null keys and values (but are never exported).  
    //Otherwise, keys and vals are never null.
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;//当前key对应的hash值
        final K key;//实际存储的key
        volatile V val;//实际存储的value
        volatile Node<K,V> next;//如果是链表结构,则指向下一个Node结点
     
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ...
    }
    
    //Nodes for use in TreeBins
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;//red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;//needed to unlink next upon deletion
        boolean red;


        TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        ...
    }
    ...
}

4.ConcurrentHashMap的put操作流程

(1)ConcurrentHashMap的put操作流程

(2)ConcurrentHashMap和HashMap的put操作

(3)为什么ConcurrentHashMap是并发安全的

(1)ConcurrentHashMap的put操作流程

首先通过key的hashCode的高低16位的位与操作来计算key的hash值,让32位的hashCode都参与运算以降低数组大小小于32时哈希冲突的概率。

然后判断Node数组是否为空或者Node数组的长度是否为0。如果为空或者为0,则调用initTable()方法进行初始化。如果不为空,则通过hash & (n - 1)计算当前key在Node数组中的下标位置。并通过tabAt()方法获取该位置的值f,然后判断该位置的值f是否为空。

如果该位置的值f为空,则把当前的key和value封装成Node对象。然后尝试通过casTabAt()方法使用CAS设置该位置的值f为封装好的Node对象。如果CAS设置成功,则退出for循环,否则继续进行下一次for循环。

如果该位置的值f不为空,则判断Node数组是否正处于扩容中。如果是,那么当前线程就调用helpTransfer()方法进行并发扩容。如果不是,那么说明当前的key在Node数组中出现了Hash冲突。于是通过synchronized关键字,对该位置的值f进行Hash冲突处理。其实JUC还可以继续优化,比如先用CAS尝试修改哈希冲突下的链表或红黑树。如果CAS修改失败,那么再通过使用synchronized对该数组元素加锁来进行处理。

最后,会调用addCount()方法统计Node数组中的元素个数。

public class ConcurrentHashMapDemo {
    public static void main(String[] args) {
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();
        map.put("k1", "v1");
    }
}

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;
    
    //Creates a new, empty map with the default initial table size (16).
    public ConcurrentHashMap() {
        
    }
    
    //Creates a new, empty map with an initial table size 
    //accommodating the specified number of elements without the need to dynamically resize.
    //@param initialCapacity The implementation performs internal sizing to accommodate this many elements.
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0) {
            throw new IllegalArgumentException();
        }
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    //Returns a power of two table size for the given desired capacity.
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
    
    static final int spread(int h) {
        //通过hashCode的高低16位的异或运算来计算hash值,以降低数组大小比32小的时候的哈希冲突概率
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
    
    //Maps the specified key to the specified value in this table.
    //Neither the key nor the value can be null.
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性
    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的
    //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    //CAS设置Node数组的元素为某个Node对象
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) {
            throw new NullPointerException();
        }
      
        //通过key的hashCode的高低16位的位与操作来计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
       
        //这是一个没有结束条件的for循环,用来自旋
        //其中Node数组的引用赋值给了tab变量
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) {
                //调用initTable()方法初始化Node数组
                tab = initTable();
            } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
                    break;// no lock when adding to empty bin
                }
            } else if ((fh = f.hash) == MOVED) {
                //如果发现Node数组正处于扩容中,那么就进行并发扩容
                tab = helpTransfer(tab, f);
            } else {
                V oldVal = null;
                //处理Hash冲突
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {//如果是链表
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) {
                                        e.val = value;
                                    }
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {//如果是红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //如果链表的元素大于等于8
                    if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8
                        treeifyBin(tab, i);//链表转红黑树
                    }
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        //调用addCount()方法统计Node数组元素的个数
        addCount(1L, binCount);
        return null;
    }
    ...
}

(2)ConcurrentHashMap和HashMap的put操作

都是通过key的hashCode的高低16位的异或运算,来降低Hash冲突概率。

都是通过Hash值与数组大小-1的位与运算(取模),来定位key在数组的位置。

但ConcurrentHashMap使用了自旋 + CAS + synchronized来处理put操作,从而保证了多个线程对数组里某个key进行赋值时的效率 + 并发安全性。

public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {
    static final int TREEIFY_THRESHOLD = 8;//链表转红黑树的阈值
    ...
    
    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }
  
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0) {
            n = (tab = resize()).length;
        }
        if ((p = tab[i = (n - 1) & hash]) == null) {
            //如果通过哈希寻址算法定位到的下标为i的数组元素为空(即tab[i]为空)
            //那么就可以直接将一个新创建的Node对象放到数组的tab[i]这个位置;
            tab[i] = newNode(hash, key, value, null);
        } else {
            Node<K,V> e; K k;
            if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) {
                //通过哈希寻址算法定位到的数组位置已有Node元素
                //那么判断是否为相同的key,如果是相同的key则进行value覆盖
                e = p;
            } else if (p instanceof TreeNode) {
                //通过哈希寻址算法定位到的数组位置已有Node元素,而且不是相同的key
                //那么通过"p instanceof TreeNode)",判断数组的tab[i]元素是否是一颗红黑树
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            } else {
                ...
            }
            ...
        }
        ++modCount;
        //判断数组大小size,是否已经达到了扩容阈值threshold大小
        if (++size > threshold) {
            resize();
        }
        afterNodeInsertion(evict);
        return null;
    }
    ...
}

(3)为什么ConcurrentHashMap是并发安全的

首先在初始化Node数组时,会通过自旋 + CAS去设置sizeCtl的值来获得锁。然后在put()操作时,也会通过自旋 + CAS去设置数组某个位置的值。当出现Hash冲突时,则使用synchronized关键字来修改数组某个位置的值。

5.ConcurrentHashMap的Node数组初始化

(1)调用put()方法时才初始化Node数组

(2)initTable()方法的初始化逻辑

(3)sizeCtl的状态流转

(1)调用put()方法时才初始化Node数组

Node数组的初始化过程是被动的,当调用ConcurrentHashMap.put()方法时,如果发现Node数组还没有被初始化,才会调用initTable()方法完成初始化。

(2)initTable()方法的初始化逻辑

initTable()方法和一般的初始化方法不同,因为需要考虑多线程并发情形。

首先while循环的退出条件是Node数据即table初始化成功,否则一直循环。这其实就使用到了自旋的机制,因为多个线程调用initTable()必然会竞争。而在竞争的情况下如果不采用独占锁机制,就只能通过自旋来不断重试。

然后通过sizeCtl是否小于0来判断当前是否有其他线程正在进行初始化。如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源。如果没有,则通过CAS修改sizeCtl变量的值为-1。

如果CAS修改sizeCtl成功,则表示当前线程获取初始化Node数组的锁成功了;

如果CAS修改sizeCtl失败,则表示当前线程获取初始化Node数组的锁失败了;

对于获取锁失败的线程,会继续进入下一次while循环进行重试,这样设计是为了避免出现多个线程同时初始化Node数组。

对于获取锁成功的线程,首先会判断Node数组是否已经初始化完成。如果Node数组已经初始化完成,则退出while循环。如果Node数组还是空,则创建一个Node数组,然后赋值给table变量。并且计算下次扩容的阈值(0.75倍当前数组容量),然后赋值给sizeCtl。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    //sizeCtl = -1,表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组
    //sizeCtl < -1,用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量
    //sizeCtl = 0,表示Node数组未初始化,并且在ConcurrentHashMap构造方法中没有指定初始容量
    //sizeCtl > 0,如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75),如果未初始化,则表示数组的初始容量
    private transient volatile int sizeCtl;
    private static final long SIZECTL;
  
    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();//获取UnSafe对象
            Class<?> k = ConcurrentHashMap.class;
            SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));//获取sizeCtl变量的偏移量
            ...
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    ...
    
    //初始化Node数组
    //Initializes table, using the size recorded in sizeCtl.
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //退出while循环的条件是Node数组即table初始化成功
        while ((tab = table) == null || tab.length == 0) {
            //判断当前是否有其他线程正在进行初始化
            if ((sc = sizeCtl) < 0) {
                //如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源
                Thread.yield();//lost initialization race; just spin
            } else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                //如果没有线程正在进行初始化,则通过CAS修改sizeCtl变量的值为-1
                //如果CAS修改成功,则表示当前线程获得了初始化数组的锁
                //如果CAS修改失败,则表示当前线程获取初始化数组的锁失败
                try {
                    //再次判断Node数组是否为空,即Node数组是否已经初始化完成
                    //因为执行Thread.yield()让出CPU资源的线程必然会再次执行到这里
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //初始化大小为n的Node数组,然后赋值给tab变量和table变量
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        //赋值给ConcurrentHashMap的全局Node数组table
                        table = tab = nt;
                        //计算下次扩容的阈值,阈值的计算是当前数组容量的0.75倍
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //最后将扩容的阈值赋值给sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    ...
}

(3)sizeCtl的状态流转

一.sizeCtl = -1

表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组。

二.sizeCtl < -1

用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量。

三.sizeCtl = 0

表示Node数组未初始化,且创建ConcurrentHashMap时没有指定初始容量。

四.sizeCtl > 0

如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75)。如果Node数组未初始化,则表示数组的初始容量。

图片

6.ConcurrentHashMap对Hash冲突的处理

(1)Hash冲突的几个解决方案

(2)ConcurrentHashMap对Hash冲突的处理

(3)链表长度大于8时是扩容还是转化为红黑树

(1)Hash冲突的几个解决方案

一.开放寻址法

如果位置i被占用,那么就探查i+1、i+2、i+3的位置。ThreadLocal采用的就是开放寻址法。

二.链式寻址法

Hash表的每个位置都连接一个链表。当发生Hash冲突时,冲突的元素会被加入到这个位置的链表中。ConcurrentHashMap就是基于链式寻址法解决Hash冲突的。

三.再Hash法

提供多个不同的Hash函数,当发生Hash冲突时,使用第二个、第三个等。

(2)ConcurrentHashMap对Hash冲突的处理

首先使用synchronized对当前位置的Node对象f进行加锁。由于这种锁控制在数组的单个数据元素上,所以长度为16的数组理论上就可以支持16个线程并发写入数据。

然后判断当前位置的Node对象f是链表还是红黑树。如果是链表,那么就把当前的key/value封装成Node对象插入到链表的尾部。如果是红黑树,那么就调用TreeBin的putTreeVal()方法往红黑树插入结点。

最后判断链表的长度是否大于等于8,如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是将链表转化为红黑树。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;
    
    //Maps the specified key to the specified value in this table.
    //Neither the key nor the value can be null.
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    //获取Node数组在位置i的元素
    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的
    //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    //CAS设置Node数组的元素为某个Node对象
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) {
            throw new NullPointerException();
        }
      
        //通过key的hashCode的高低16位的位与操作来计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
       
        //这是一个没有结束条件的for循环,用来自旋
        //其中Node数组的引用赋值给了tab变量
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) {
                //调用initTable()方法初始化Node数组
                tab = initTable();
            } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
                    break;// no lock when adding to empty bin
                }
            } else if ((fh = f.hash) == MOVED) {
                //发现Node数组正处于扩容中,那么就进行并发扩容
                tab = helpTransfer(tab, f);
            } else {
                V oldVal = null;
                //处理Hash冲突
                synchronized (f) {//使用synchronized对当前数组位置的Node对象f进行加锁
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {//如果是链表
                            binCount = 1;//binCount用来记录链表的长度
                            //从链表的头结点开始遍历每个结点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //如果存在相同的key,则修改该key的value
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) {
                                        e.val = value;
                                    }
                                    break;
                                }
                                Node<K,V> pred = e;
                                //找到链表的最后一个结点
                                if ((e = e.next) == null) {
                                    //把当前的key/value封装成Node对象插入到链表的尾部
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {//如果是红黑树
                            Node<K,V> p;
                            binCount = 2;
                            //调用TreeBin的putTreeVal()方法往红黑树插入结点
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                //最后判断链表的长度是否大于等于8
                if (binCount != 0) {
                    //如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是转化为红黑树
                    if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8
                        treeifyBin(tab, i);//是扩容数组还是转化为红黑树
                    }
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        //调用addCount()方法统计Node数组元素的个数
        addCount(1L, binCount);
        return null;
    }
    ...
}

(3)链表长度大于8时是扩容还是转化为红黑树

当链表长度 >= 8时ConcurrentHashMap会对链表采用两种方式进行优化。

方式一:对数组进行扩容

当数组长度 <= 64,且链表长度 >= 8时,优先选择对数组进行扩容。

方式二:把链表转化为红黑树

当数组长度 > 64,且链表长度 >= 8时,会将链表转化为红黑树。

treeifyBin()方法的作用是根据相关阈值来决定是扩容还是把链表转为红黑树。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    static final int MIN_TREEIFY_CAPACITY = 64;
    ...
    //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //如果当前数组的长度小于64,则调用tryPresize()方法进行数组扩容
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
                tryPresize(n << 1);//数组扩容
            } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            //构建一个TreeNode并插入红黑树中
                            TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
                            if ((p.prev = tl) == null) {
                                hd = p;
                            } else {
                                tl.next = p;
                            }
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
    ...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/973000.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java中的常用类 --String

学习目标 掌握String常用方法掌握StringBuilder、StringBuffer了解正则 1.String ● String是JDK中提前定义好的类型 其所在的包是java.lang ,String翻译过来表示字符串类型&#xff0c;也就是说String类中已经提前定义好了很多方法都是用来处理字符串的&#xff0c;所以Str…

wps中的js开发

严格区分大小写 /*** learn_js Macro*/ function test() {Range(D7).Value2Selection.Value2; // Selection.formula "100" }function Workbook_SheetSelectionChange(Sh, Target) {if(Sh.Name Sheet1) {test();}}function test2() {// 把I4单元格及其周边有数的单…

QT事件循环

文章目录 主事件循环事件循环事件调度器事件处理投递事件发送事件 事件循环的嵌套线程的事件循环deleteLater与事件循环QEventLoop类QEventLoop应用等待一段时间同步操作模拟模态对话框 参考 本文主要对QT中的事件循环做简单介绍和使用 Qt作为一个跨平台的UI框架&#xff0c;其…

3-知识图谱-知识图谱的存储与查询

基于关系型数据库的知识图谱存储 基于原生图的知识图谱存储 关系型数据库的局限性 因为关系数据库&#xff0c;不善于处理“关系” 图数据库&#xff1a; Relations Are First-class citizens 在关系数据库中&#xff0c;关系是隐藏表达的。通过外键关联实体&#xff0c;表达…

【HarmonyOS Next】鸿蒙监听手机按键

【HarmonyOS Next】鸿蒙监听手机按键 一、前言 应用开发中我们会遇到监听用户实体按键&#xff0c;或者扩展按键的需求。亦或者是在某些场景下&#xff0c;禁止用户按下某些按键的业务需求。 这两种需求&#xff0c;鸿蒙都提供了对应的监听事件进行处理。 onKeyEvent 默认的…

SpringCloud-Eureka初步使用

什么是REST是一组用于规范资源在网络中转移的表现形式软件架构设计风格.简单来说就是客户端和服务器之间的一种交互形式 什么是RESTful,满足了REST风格的接口或者程序,RESTful API是其中的接口,spring中提供了RestTemplate这个类,他强制执行了REST的规范,包括使用HTTP协议的状…

SpringBoot+uniApp日历备忘录小程序系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.日历渲染代码&#xff1a;2.保存备忘录代码&#xff1a;3.删除备忘录代码&#xff1a; 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBootuniApp框架开…

推荐给 Easysearch 新用户的几个 Elasticsearch 可视化工具

Easysearch 作为国产化的 Elasticsearch&#xff08;ES&#xff09;替代方案&#xff0c;兼容 Elasticsearch 生态系统中的多种工具。本文将介绍几款适合 Easysearch 用户的可视化工具&#xff0c;帮助您更高效地管理和查询数据。 1. Elasticsearch Head 插件 在ES培训经常提…

PHP+Apache+MySQL安装(Windows)

一、安装教程 参考链接1 参考链接2 二、问题描述 PHP安装目录下找不到php8apache2_4.dll PHP安装包下载错误 Apache Service Monitor: request operation has failed! 定位问题&#xff1a; 查看【事件查看器】 解决问题 安装或更新与PHP版本相对应的Visual C Redistribu…

捷米特 JM - RTU - TCP 网关应用 F - net 协议转 Modbus TCP 实现电脑控制流量计

一、项目背景 在某工业生产园区的供水系统中&#xff0c;为了精确监测和控制各个生产环节的用水流量&#xff0c;需要对分布在不同区域的多个流量计进行集中管理。这些流量计原本采用 F - net 协议进行数据传输&#xff0c;但园区的监控系统基于 Modbus TCP 协议进行数据交互&…

【Mysql】我在广州学Mysql 系列—— 有关日志管理的示例

ℹ️大家好&#xff0c;我是练小杰&#xff0c;今天星期四了&#xff0c;明天周五&#xff0c;美好的周末又要到了&#xff01;&#xff01;&#x1f606; 本文是对MySQL日志管理内容进行练习&#xff0c;后续将添加更多相关知识噢&#xff0c;谢谢各位的支持&#x1f64f; 复习…

JUC并发—8.并发安全集合二

大纲 1.JDK 1.7的HashMap的死循环与数据丢失 2.ConcurrentHashMap的并发安全 3.ConcurrentHashMap的设计介绍 4.ConcurrentHashMap的put操作流程 5.ConcurrentHashMap的Node数组初始化 6.ConcurrentHashMap对Hash冲突的处理 7.ConcurrentHashMap的并发扩容机制 8.Concu…

金融时间序列【量化理论】

业界常用的技术分析指标都与价格本身有关&#xff0c;而时间序列分析由于对数据平稳性的要求常常是基于收益率这样更加偏稳定的数据&#xff08;收益率由于会涨停和跌停每天最多10%&#xff09; 平稳性&#xff1a; 强平稳性&#xff1a;随时间变化&#xff0c;各个统计特征都…

nvm安装、管理node多版本以及配置环境变量【保姆级教程】

引言 不同的项目运行时可能需要不同的node版本才可以运行&#xff0c;由于来回进行卸载不同版本的node比较麻烦&#xff1b;所以需要使用node工程多版本管理。 本人在配置时&#xff0c;通过网络搜索教程&#xff0c;由于文章时间过老&#xff0c;或者文章的互相拷贝导致配置时…

8 SpringBootWeb案例(上): 查询【分页功能(分页插件)】、删除、新增、修改

文章目录 前言:SpringBootWeb案例1. 准备工作1.1 需求&环境搭建1.1.1 需求说明1.1.2 环境搭建1.2 开发规范1.2.1 开发规范-REST(不强求非要这种风格,传统风格有时候更方便)1.2.2 开发规范-统一响应结果和异常处理1.2.3 开发流程2. 部门管理2.1 查询部门2.1.1 原型和需求…

新手小白如何挖掘cnvd通用漏洞之存储xss漏洞(利用xss钓鱼)

视频教程和更多福利在我主页简介或专栏里 &#xff08;不懂都可以来问我 专栏找我哦&#xff09; 如果对你有帮助你可以来专栏找我&#xff0c;我可以无偿分享给你对你更有帮助的一些经验和资料哦 目录&#xff1a; 一、XSS的三种类型&#xff1a; 二、XSS攻击的危害&#x…

用《软件方法》引导AI全流程高效开发

和“敏捷”的拍脑袋“试错”不同&#xff0c;《软件方法》一直强调严谨地思考、推导和建模。 如何尽量借助现有AI的力量&#xff0c;降低建模人员A→B→C→D的推导工作量&#xff0c;是一个非常有价值的课题。我们将用一个实例来分享和展示《软件方法》作者潘加宇的建议实践。…

全面收集中间件Exporter适配:从Redis到ActiveMQ,掌握监控数据采集的最佳实践

#作者&#xff1a;任少近 文章目录 说明&#xff1a;一 Redis的适配exporter版1.1 Redis的exporter源码版本1.2 Redis的exporter的releases版1.3 Redis_exporter版本选择理由1.4 Redis_exporter docer镜像 二 Zookeeper的适配exporter版2.1 Zookeeper的exporter源码版本2.2 Zo…

npm在install时提示要安装python问题处理

使用npm\yarn\pnpm下载以来的时候&#xff0c;一直提示python异常&#xff0c;有的项目安装了python之后&#xff0c;下载依赖还是异常 而且旧版本项目使用python2,新的使用Python3…很烦 解决方案1&#xff1a;cnpm 使用cnpm 安装教程&#xff1a; npm安装cnpm&#xff0c;解…

浅谈网络 | 容器网络之Cilium

目录 Cilium介绍Cilium是什么Cilium 主要功能特性为什么用Cilium&#xff1f; 功能概述组件概况BPF 与 XDPeBPF (Extended Berkeley Packet Filter)XDP (eXpress Data Path) Cilium介绍 Cilium是什么 Cilium 是一个开源网络和安全项目&#xff0c;专为 Kubernetes、Docker 和…