文章目录
- 1. 问题引入
- 2. CAS底层详解
- 1. Java中CAS实现
- 2. CAS源码分析
- 3. CAS操作存在的缺陷
- 4. ABA问题及其解决方案
1. 问题引入
见下面代码
public class Main {
private volatile static int sum=0;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread t1=new Thread(()->{
for (int j = 0; j < 10000; j++) {
sum++;
}
});
t1.start();
}
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(sum);
}
}
我们知道volatile可以保证可见性和有序性,但它不能保证原子性,所以sum的值才不会是10000。针对这个问题可以有如下解决方法:
- 使用Synchronized
public class Main {
private volatile static int sum=0;
static Object object="";
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread t1=new Thread(()->{
synchronized(object){
for (int j = 0; j < 10000; j++) {
sum++;
}
}
});
t1.start();
}
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(sum);
}
}
但我们知道Synchronized多线程情况下,只有一个线程会获取对象锁,其它线程会陷入阻塞,而且在线程切换时还涉及操作系统用户态向内核态的转换,这样性能是非常低的,所以不推荐使用Synchronized
- Lock
public class Main {
private volatile static int sum=0;
ReetrantLock lock=new ReetrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
lock.lock();
try{
Thread t1=new Thread(()->{
for (int j = 0; j < 10000; j++) {
sum++;
}
}finally{
lock.unlock();
}
});
t1.start();
}
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(sum);
}
}
ReetrantLock底层使用的是park和unpark原理实现的
- CAS
CAS(Compare And Swap,比较并交换),通常指的是这样一种原子操作:针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同就给它一个新值。CAS可以看作一个不可分割的原子操作,并且原子性是直接在硬件层面得到保障的。CAS可以看成一种乐观锁的一种实现,Java的原子类的递增就是使用CAS自旋实现的。CAS是一种无锁算法,在不使用锁的情况下实现多线程之间的变量同步。
2. CAS底层详解
1. Java中CAS实现
在Java中,CAS操作是由Unsafe类提供支持的,该类定义类三种针对不同类型变量的CAS操作:
public final native boolean compareAndSwapObject(Object var1,long var2 ,Object var4,Object var5)
public final native boolean compareAndSwapInt(Object var1,long var2 ,int var4,int var5)
public final native boolean compareAndSwapLong(Object var1,long var2 ,long var4,long var5)
它们都是 native 方法,由Java 虚拟机提供具体实现,这意味着不同的 Java 虚拟机对它们的实现 可能会略有不同。以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别 是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。
publicclassCASTest{ 2
public static void main(String[] args) {
Entity entity = new Entity();
Unsafe unsafe = UnsafeFactory.getUnsafe();
long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x"); 9
boolean successful;
// 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段新值
successful = unsafe.compareAndSwapInt(entity, offset, 0, );
System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, );
System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, );
System.out.println(successful + "\t" + entity.x);
}
}
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class UnsafeFactory{
/**
* 获取 Unsafe 对象
* @return
*/
public static Unsafe getUnsafe() {
try{
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获取字段的内存偏移量
* @param unsafe
* @param clazz
* @param fieldName
* @return
*/
public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
try{
return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
}
2. CAS源码分析
通过对cas使用的学习,我们知道CAS在操作系统底层已经帮我们保证了原子性,但操作系统并没有帮我保证原子性和可见性,其实这两个部分是JVM实现帮我们保证的,那么我们来看一下JVM是如何帮我们实现可见性的
上面方法汇最核心的是调用cmpxchg方法,addr是交换值的偏移地址,它通过对象的地址p加上偏移地址得到,然后调用cmpchg方法,x是原来的值,e是交换值,然后根据返回值是否与e相等(相等则交换成功,否则交换失败)来判断CAS操作是否成功,cmpxchg源码如下(汇编代码):
#atomic_linux_x86.inline.hpp inlinejintAtomic::cmpxchg(jintexchange_value,volatilejint*dest,jintco
pare_value) {
//判断当前执行环境是否为多处理器环境
int mp = os::is_MP();
//LOCK_IF_MP(%4) 在多处理器环境下,为 cmpxchgl 指令添加 lock 前缀,以达到内存屏障的效果
//cmpxchgl 指令是包含在 x86 架构及 IA‐64 架构中的一个原子条件指令,
//它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等,
//如果相等,则双向交换 dest 与 exchange_value,否则就单方面地将 dest 指向的内存值
给exchange_value。
//这条指令完成了整个 CAS 操作,因此它也被称为 CAS 指令。
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" //1 3代表参数
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
上面代码是X86架构下cmpchg方法的实现,不同架构实现不同。如果是多处理器架构(说明回事多线程环境),它会调用lock前缀指令来加入内存屏障以实现可见性和有序性以及原子性。
lock 指令是 x86 架构中用于实现原子操作的一种方式,它在汇编层面提供了一些特殊的处理,以确保对共享内存的操作是原子的。下面详细解释一下 lock 指令的实现和其特性:
原子性:
总线锁定(Bus Locking): lock 指令在执行期间会通过总线锁定机制,防止其他处理器同时访问相同的内存位置。这意味着在执行 lock 指令期间,其他处理器无法执行与该内存位置相关的操作。例如,在 CAS 操作中,lock 指令确保了读取、比较和写入这三个步骤的原子性。
可见性:
缓存一致性: 当一个处理器执行 lock 指令时,它会发出信号通知其他处理器将本地缓存中的数据失效。这迫使其他处理器重新从主内存中加载数据,确保所有处理器看到的是最新的数据。这种机制确保了在多处理器系统中的可见性。
有序性:
内存屏障(Memory Barrier): lock 指令会创建一个内存屏障,防止在 lock 指令前后的指令发生重排序。内存屏障确保了指令的执行顺序符合程序员的预期,从而提供了有序性。缺陷:
性能开销: 使用 lock 指令可能引入性能开销,特别是在高度并发的情况下。当多个处理器争夺同一内存位置的锁时,会导致总线的竞争,进而降低整体性能。
粒度较大: lock 指令通常是在较大的内存范围上操作,这可能导致锁的粒度较大。在高并发情况下,锁的争用可能变得更为激烈,影响性能。
不适用于所有场景: lock 指令适用于一些简单的原子操作,但对于复杂的操作,使用更高级别的同步工具可能更为合适,例如 java.util.concurrent 包中提供的锁、原子变量等。
Atomic::cmpxchg这个函数最终返回值是exchange_value("=a"说明是方法的返回值),也就是说,如果cmpxchgl执行 时compare_value和dest指针指向内存值相等则会使得dest指针指向内存值变成 exchange_value,最终eax存的compare_value赋值给了exchange_value变量,即函数最终返回的值是原先的compare_value。此时Unsafe_CompareAndSwapInt的返回值(jint) (Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。如果cmpxchgl执行时 compare_value和(dest)不等则会把当前dest指针指向内存的值写入eax,最终输出时赋值 给exchange_value变量作为返回值,导致(jint)(Atomic::cmpxchg(x, addr, e)) == e得到 false,表明CAS失败。所以这个层面就实现了可见性(它把值通过exchange_value返回来了) 。
现代处理器指令架构基本上都会提供CAS指令,CAS指令作为一种硬件原语,有着天原的原子性,这也正是CAS的价值所在。第一节的部分用CAS实现代码如下:
定义CAS自旋锁
public class CASLock{
private volatile int state;
private static final Unsafe UNSAFE;
private static final long OFFSET;
static{
try{
UNSAFE=UnsafeFactory.getUnsafe();
OFFSET=UnsafeFactory.getFieldOffset(UNSAFE,CASLock.class,"state");
}catch(Exception e){
throw new Error(e);
}
}
public boolean cas(){return UNSAFE.compareAndSwapInt(this,OFFSET,0,1);};
public int getState() {return state;}
public void setState(int state){this.state=state;}
}
使用CAS解决所提问题
public class Main {
private volatile static int sum=0;
private static CASLock casLock=new CASLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread t1=new Thread(()->{
for(;;){
if(casLock.getState()==0 && casLock.cas()){
try{
for (int j = 0; j < 10000; j++) {
sum++;
}
}finally {
casLock.setState(1);
}
}
}
});
t1.start();
}
try {
Thread.sleep(6000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(sum);
}
}
上面整个过程也介绍了Atomi原子操作的底层原理
可以发现得到了正确的结果。但上面代码还是存在问题,上面创建了10个线程,但是同一时间只有一个线程在执行sum++,其它线程都在反复执行cas检查,不断的在for循环这里空转,空转占用cpu资源,这会对CPU资源造成很大的浪费(这里sum++影响不大,但对于一些复杂的逻辑空转会持续很久),因此我们就可以总结出CAS操作存在的一些缺陷。
3. CAS操作存在的缺陷
CAS虽然高效的解决了原子操作,但是还是存在一些缺陷,主要表现在三个方面:
- 自旋CAS长时间不成功,则会给CPU带来非常大的开销
- 只能保证一个共享变量的原子操作
- ABA问题
4. ABA问题及其解决方案
ABA问题是CAS中常出现的问题之一,假设一种情况,现在线程2将i的值从10修改为了20,然后线程3将20又修改为10,现在线程1读到i得值为10,由于线程之间的隔离性,所以线程1并不知道其它线程对i进行了操作,所以它误认为i并没有被修改过。这就是所谓的ABA问题,具体来说,ABA问题指的是在多线程环境下,一个值从A变为B,然后再变回A,而在这个过程中,可能发生了其他操作,导致判断值是否相同的CAS操作出现误判。尽管共享变量的值发生了变化,但是如果使用CAS进行比较时,只会检查共享变量的值是否仍然为A,而不考虑中间的变化。这样,CAS操作可能会在不知道共享变量的中间状态的情况下成功,产生误判。ABA问题可能导致程序逻辑错误,尤其是在涉及到数据一致性和正确性的场景下。例如,在实现一种无锁数据结构时,如果不考虑ABA问题,可能会导致意外的行为。
为了解决ABA问题,通常采用版本号或时间戳等机制。每次对共享变量的修改都伴随着版本号的增加,而不仅仅是数值的变化。这样,在比较时,除了检查数值是否相同外,还要检查版本号,从而避免了对中间状态的误判。Java中的AtomicStampedReference类就是为了解决ABA问题而设计的,它使用了版本戳的概念,提供了一种更安全的CAS操作,可以防范ABA问题的发生。
public static void main(String[] args) {
// 初始值为 "A"
AtomicStampedReference<String> atomicRef = new AtomicStampedReference<>("A", 0);
// 模拟线程1
Thread thread1 = new Thread(() -> {
//获得版本戳
int stamp = atomicRef.getStamp();
//获得当前值
String oldValue = atomicRef.getReference();
System.out.println("Thread 1 - Current value: " + oldValue);
// 模拟一些处理时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 线程1将值从 "A" 改为 "B",递增版本戳
atomicRef.compareAndSet(oldValue, "B", stamp, stamp + 1);
});
// 模拟线程2
Thread thread2 = new Thread(() -> {
// 等待线程1执行完毕
try {
thread1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 线程2尝试将值从 "A" 改回 "A",但由于版本戳的存在,CAS操作失败
int[] stampHolder = new int[1];
String oldValue = atomicRef.getReference();
int stamp = Integer.parseInt(atomicRef.get(stampHolder));
atomicRef.compareAndSet(oldValue, "A", stamp, stampHolder[0] + 1);
System.out.println("Thread 2 - CAS Result: " + atomicRef.getReference());
});
// 启动线程
thread1.start();
thread2.start();
}