目录
ConcurrentHashMap 一定是线程安全的吗
CAS 机制的注意事项
使用java 并行流 ,您要留意了
ConcurrentHashMap 在JDK1.8中ConcurrentHashMap 内部使用的是数组加链表加红黑树的结构,通过CAS+volatile或synchronized的方式来保证线程安全的,这些原理已毋庸置疑,一言不合上代码.
1. 模拟2个线程累计,通过ConcurrentHashMap 储存累计的结果。
/**
* @description: ConcurrentHashMap 真的安全吗
* @author: ppx
* @date: 2023/8/17 14:11
* @version: 1.0
*/
public class TestMap {
private static ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
private static String key = "hello";
/**
* @description: 测试2个线程 执行计算
* @param:
* @return: void
* @author: ppx
* @date: 2023/8/17 16:43
*/
private static void testRun() {
ExecutorService executor = new ThreadPoolExecutor(2, 5,
2L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 2; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
// 第一步读取
int value = concurrentHashMap.getOrDefault(key, 0);
// 第二步+1
value++;
// 第三补+ 回写map
concurrentHashMap.put(key, value);
}
});
}
executor.shutdown();
// 直到线程执行完成
while(!executor.isTerminated()){
}
System.out.println("执行结果:" + concurrentHashMap.get(key));
}
public static void main(String[] args) {
testRun();
}
}
2.出乎意料执行多次输出不同的结果:
3. 分析原理:ConcurrentHashMap 本身是线程安全的,但for 里面的获map取值、加加操作及回写map 这三步是非原子性。要保证操作的安全性,这三步实现原子性即可。
优化后代码:
private static void testRun() {
ExecutorService executor = new ThreadPoolExecutor(2, 5,
2L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 2; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
synchronized (TestMap.class) {
int value = concurrentHashMap.getOrDefault(key, 0);
value ++;
concurrentHashMap.put(key, value);
}
}
});
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("执行结果:" + concurrentHashMap.get(key));
}
CAS 机制的注意事项
某线程把数据A更新了B,随后又从B更新成A,恰好此时另一线程读取该数据,发现数据的值还是A没有变化,误认为还是原来的A,但此时A的一些属性或状态已经发生过变化。
CAS操作中将判断“V的值是否仍然为A?”,如果是的话将执行更新操作,在某些CAS操作中,如果V的值首先由A变为B,再由B变为A,那么CAS仍然将会操作成功。
ABA问题:
线程A 的操作,cas中的值由1变成99,再由99变成1,此次线程B 发现AtomicInteger 的值还是1,于是更新到50,产生ABA的问题。
private static AtomicInteger atomicInteger = new AtomicInteger(1);
public static void main(String[] args) {
Thread threadA = new Thread(() -> {
atomicInteger.compareAndSet(1, 99);
atomicInteger.compareAndSet(99, 1);
System.out.println("线程A进行CAS后的值:"+atomicInteger.get());
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程A");
Thread threadB = new Thread(() -> {
try{
atomicInteger.compareAndSet(1, 50);
System.out.println("线程B进行CAS后的值:"+atomicInteger.get());
}catch (Exception e) {
e.printStackTrace();
}
}, "线程B");
threadA.start();
try {
threadA.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadB.start();
}
基于AtomicStampedReference类实现
AtomicStampedReference内部增加了版本号的概念,只有期待的值与版本号分别匹配后,才满足条件,更新最新的值。
案例:
线程 A 进行CAS 操作更新时,发布版本已发生变动,CAS更新 失败。线程B 进行CAS 操作更新时,匹配对应的版本,期待值,更新成功。
public static void main(String[] args) {
new Thread(() -> {
// 让线程B 获取最新版本号,成功 执行更新
try {
Thread.sleep(11);
} catch (InterruptedException e) {
e.printStackTrace();
}
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + ", 当前版本号为:" + stamp);
boolean firstCasFlag = atomicStampedReference.compareAndSet(100, 99, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println("当前版本号:"+atomicStampedReference.getStamp()+", 线程A进行CAS后的值:" + atomicInteger.get() + ",第1次操作是否修改成功: " + firstCasFlag);
}, "线程A").start();
new Thread(() -> {
try {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + ", 版本号为:" + atomicStampedReference.getStamp());
boolean flag = atomicStampedReference.compareAndSet(100, 888, stamp, atomicStampedReference.getStamp() + 1);
System.out.println("线程B进行CAS后的值:" + atomicStampedReference.getReference() + ", 此次操作是否修改成功: " + flag);
} catch (Exception e) {
e.printStackTrace();
}
}, "线程B").start();
}
执行结果:
线程B, 版本号为:1
线程B进行CAS后的值:888, 此次操作是否修改成功: true
线程A, 当前版本号为:2
当前版本号:2, 线程A进行CAS后的值:1,第1次操作是否修改成功: false