并发编程(4)共享模型之无锁

6 共享模型之无锁

本章内容

  • CAS 与 volatile
  • 原子整数
  • 原子引用
  • 原子累加器
  • Unsafe

6.1 问题提出

有如下需求,保证 account.withdraw 取款方法的线程安全

import java.util.ArrayList;
import java.util.List;

interface Account {
    
    // 获取余额
    Integer getBalance();
    
    // 取款
    void withdraw(Integer amount);
    
    /**
    * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
    * 如果初始余额为 10000 那么正确的结果应当是 0
    */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        ts.forEach(Thread::start);
        
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        
        System.out.println(account.getBalance() 
                           + " cost: " + (end-start)/1000_000 + " ms");
    }
}

原有实现并不是线程安全的

class AccountUnsafe implements Account {
    private Integer balance;
    
    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }
    
    @Override
    public Integer getBalance() {
        return balance;
    }
    
    @Override
    public void withdraw(Integer amount) {
        balance -= amount;
    }
}

执行测试代码

public static void main(String[] args) {
    Account.demo(new AccountUnsafe(10000));
}

某次的执行结果 330 cost: 306 ms

为什么不安全

withdraw 方法

public void withdraw(Integer amount) {
    balance -= amount; 
}

对应的字节码

img

多线程执行流程

img

img

  • 单核的指令交错
  • 多核的指令交错

解决思路-synchronized锁

首先想到的是给 Account 对象加锁

class AccountUnsafe implements Account {
    private Integer balance;
    
    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }
    
    @Override
    public synchronized Integer getBalance() {
        return balance;
    }
    
    @Override
    public synchronized void withdraw(Integer amount) {
        balance -= amount;
    }
    
}

结果为 0 cost: 399 ms

解决思路-无锁(AtomicInteger)

class AccountSafe implements Account {
    
    private AtomicInteger balance; //原子整数
    
    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }
    
    @Override
    public Integer getBalance() {
        return balance.get();
    }
    
    @Override
    public void withdraw(Integer amount) {
        while (true) {
            int prev = balance.get();
            int next = prev - amount;
            if (balance.compareAndSet(prev, next)) {
                break;
            }
        }
        // 可以简化为下面的方法
        // balance.addAndGet(-1 * amount);
    }
}

执行测试代码

public static void main(String[] args) {
    Account.demo(new AccountSafe(10000));
}

某次的执行结果

0 cost: 302 ms

6.2 CAS 与 volatile

前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?

public void withdraw(Integer amount) {
    while(true) {
        // 需要不断尝试,直到成功为止
        while (true) {
            // 比如拿到了旧值 1000
            int prev = balance.get();
            // 在这个基础上 1000-10 = 990
            int next = prev - amount;
            /*
            compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
            - 不一致了,next 作废,返回 false 表示失败
            比如,别的线程已经做了减法,当前值已经被减成了 990
            那么本线程的这次 990 就作废了,进入 while 下次循环重试
            - 一致,以 next 设置为新值,返回 true 表示成功
            */
            if (balance.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}     

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

img

注意

其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。

  • 在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

慢动作分析

@Slf4j
public class SlowMotion {
    
    public static void main(String[] args) {
        AtomicInteger balance = new AtomicInteger(10000);
        int mainPrev = balance.get();
        log.debug("try get {}", mainPrev);
        
        new Thread(() -> {
            sleep(1000);
            int prev = balance.get();
            balance.compareAndSet(prev, 9000);
            log.debug(balance.toString());
        }, "t1").start();
        
        sleep(2000);
        log.debug("try set 8000...");
        boolean isSuccess = balance.compareAndSet(mainPrev, 8000);
        log.debug("is success ? {}", isSuccess);
        if(!isSuccess){
            mainPrev = balance.get();
            log.debug("try set 8000...");
            isSuccess = balance.compareAndSet(mainPrev, 8000);
            log.debug("is success ? {}", isSuccess);
        }
        
    }
    
    private static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

输出结果

2019-10-13 11:28:37.134 [main] try get 10000 
2019-10-13 11:28:38.154 [t1] 9000 
2019-10-13 11:28:39.154 [main] try set 8000... 
2019-10-13 11:28:39.154 [main] is success ? false 
2019-10-13 11:28:39.154 [main] try set 8000... 
2019-10-13 11:28:39.154 [main] is success ? true

volatile

获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。

它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。

注意

volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)

CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

为什么无锁效率高

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
  • 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

img

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。

  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。

  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思

    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

6.3 原子整数

J.U.C 并发包提供了:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

以 AtomicInteger 为例

AtomicInteger i = new AtomicInteger(0);

// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());

// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());

// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());

// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());

// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));

// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));

// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));

// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));

// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

6.4 原子引用

为什么需要原子引用类型?

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference

有如下方法

public interface DecimalAccount {
    // 获取余额
    BigDecimal getBalance();
    
    // 取款
    void withdraw(BigDecimal amount);
    
    /**
    * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
    * 如果初始余额为 10000 那么正确的结果应当是 0
    */
    static void demo(DecimalAccount account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);
        
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }
}

试着提供不同的 DecimalAccount 实现,实现安全的取款操作

不安全实现

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;
    
    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }
    
    @Override
    public BigDecimal getBalance() {
        return balance;
    }
    
    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
    
}

安全实现-使用锁

class DecimalAccountSafeLock implements DecimalAccount {
    
    private final Object lock = new Object();
    BigDecimal balance;
    
    public DecimalAccountSafeLock(BigDecimal balance) {
        this.balance = balance;
    }
    
    @Override
    public BigDecimal getBalance() {
        return balance;
    }
    
    @Override
    public void withdraw(BigDecimal amount) {
        synchronized (lock) {
            BigDecimal balance = this.getBalance();
            this.balance = balance.subtract(amount);
        }
    }
    
}

安全实现-使用 CAS

class DecimalAccountSafeCas implements DecimalAccount {
    AtomicReference<BigDecimal> ref;
    
    public DecimalAccountSafeCas(BigDecimal balance) {
        ref = new AtomicReference<>(balance);
    }
    
    @Override
    public BigDecimal getBalance() {
        return ref.get();
    }
    
    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = ref.get();
            BigDecimal next = prev.subtract(amount);
            if (ref.compareAndSet(prev, next)) {
                break;
            }
        }
    }
    
}

测试代码

DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal("10000")));

运行结果

4310 cost: 425 ms 
0 cost: 285 ms 
0 cost: 274 ms

ABA 问题及解决

ABA 问题

static AtomicReference<String> ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    // 这个共享变量被它线程修改过?
    String prev = ref.get();
    
    other();
    
    sleep(1);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}

private static void other() {
    
    new Thread(() -> {
        log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
    }, "t1").start();
    
    sleep(0.5);
    
    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();
    
}

输出

11:29:52.325 c.Test36 [main] - main start... 
11:29:52.379 c.Test36 [t1] - change A->B true 
11:29:52.879 c.Test36 [t2] - change B->A true 
11:29:53.880 c.Test36 [main] - change A->C true

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:

只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

AtomicStampedReference(维护版本号)

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    String prev = ref.getReference();
    // 获取版本号
    int stamp = ref.getStamp();
    log.debug("版本 {}", stamp);
    // 如果中间有其它线程干扰,发生了 ABA 现象
    other();
    sleep(1);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}

private static void other() {
    new Thread(() -> {
        log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", 
                                                      ref.getStamp(), ref.getStamp() + 1));
        log.debug("更新版本为 {}", ref.getStamp());
    }, "t1").start();
    
    sleep(0.5);
    
    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", 
                                                      ref.getStamp(), ref.getStamp() + 1));
        log.debug("更新版本为 {}", ref.getStamp());
    }, "t2").start();
}

输出为

15:41:34.891 c.Test36 [main] - main start... 
15:41:34.894 c.Test36 [main] - 版本 0 
15:41:34.956 c.Test36 [t1] - change A->B true 
15:41:34.956 c.Test36 [t1] - 更新版本为 1 
15:41:35.457 c.Test36 [t2] - change B->A true 
15:41:35.457 c.Test36 [t2] - 更新版本为 2 
15:41:36.457 c.Test36 [main] - change A->C false

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:

A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。

但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference

img

AtomicMarkableReference(仅维护是否修改过)

class GarbageBag {
    String desc;
    
    public GarbageBag(String desc) {
        this.desc = desc;
    }
    
    public void setDesc(String desc) {
        this.desc = desc;
    }
    
    @Override
    public String toString() {
        return super.toString() + " " + desc;
    }
    
}
@Slf4j
public class TestABAAtomicMarkableReference {
    public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("装满了垃圾");
        // 参数2 mark 可以看作一个标记,表示垃圾袋满了
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
        
        log.debug("主线程 start...");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());
        
        new Thread(() -> {
            log.debug("打扫卫生的线程 start...");
            bag.setDesc("空垃圾袋");
            while (!ref.compareAndSet(bag, bag, true, false)) {}
            log.debug(bag.toString());
        }).start();
        
        Thread.sleep(1000);
        log.debug("主线程想换一只新垃圾袋?");
        boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
        log.debug("换了么?" + success);
        
        log.debug(ref.getReference().toString());
    }
}

输出

2019-10-13 15:30:09.264 [main] 主线程 start... 
2019-10-13 15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾
2019-10-13 15:30:09.293 [Thread-1] 打扫卫生的线程 start... 
2019-10-13 15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
2019-10-13 15:30:10.294 [main] 主线程想换一只新垃圾袋?
2019-10-13 15:30:10.294 [main] 换了么?false 
2019-10-13 15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋

可以注释掉打扫卫生线程代码,再观察输出

6.5 原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

有如下方法

/**
    参数1,提供数组、可以是线程不安全数组或线程安全数组
    参数2,获取数组长度的方法
    参数3,自增方法,回传 array, index
    参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
    Supplier<T> arraySupplier,
    Function<T, Integer> lengthFun,
    BiConsumer<T, Integer> putConsumer,
    Consumer<T> printConsumer ) {
    
    List<Thread> ts = new ArrayList<>();
    T array = arraySupplier.get();
    int length = lengthFun.apply(array);
    for (int i = 0; i < length; i++) {
        // 每个线程对数组作 10000 次操作
        ts.add(new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                putConsumer.accept(array, j%length);
            }
        }));
    }
    ts.forEach(t -> t.start()); // 启动所有线程
    
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }); // 等所有线程结束
    printConsumer.accept(array);
}

不安全的数组

demo(
    ()->new int[10],
    (array)->array.length,
    (array, index) -> array[index]++,
    array-> System.out.println(Arrays.toString(array))
);

结果 [9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]

安全的数组AtomicIntegerArray

demo(
    ()-> new AtomicIntegerArray(10),
    (array) -> array.length(),
    (array, index) -> array.getAndIncrement(index),
    array -> System.out.println(array)
);

结果 [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

6.6 字段原子更新器

  • AtomicReferenceFieldUpdater // 域字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,

否则会出现异常 Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type

public class Test5 {
    private volatile int field;
    
    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
        
        Test5 test5 = new Test5();
        fieldUpdater.compareAndSet(test5, 0, 10);
        // 修改成功 field = 10
        System.out.println(test5.field);
        // 修改成功 field = 20
        fieldUpdater.compareAndSet(test5, 10, 20);
        System.out.println(test5.field);
        // 修改失败 field = 20
        fieldUpdater.compareAndSet(test5, 10, 30);
        System.out.println(test5.field);
    }
}

输出

10 
20 
20

6.7 原子累加器

累加器性能比较

private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
    T adder = adderSupplier.get();
    
    long start = System.nanoTime();
    
    List<Thread> ts = new ArrayList<>();
    // 4 个线程,每人累加 50 万
    for (int i = 0; i < 40; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 500000; j++) {
                action.accept(adder);
            }
        }));
    }
    ts.forEach(t -> t.start());
    
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    long end = System.nanoTime();
    System.out.println(adder + " cost:" + (end - start)/1000_000);
}

比较 AtomicLong 与 LongAdder

for (int i = 0; i < 5; i++) {
    demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
    demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}

输出

1000000 cost:43 
1000000 cost:9 
1000000 cost:7 
1000000 cost:7 
1000000 cost:7 
    
1000000 cost:31 
1000000 cost:27 
1000000 cost:28 
1000000 cost:24 
1000000 cost:22

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。

这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

源码之 LongAdder

LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;

// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;

// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

cas 锁

// 不要用于实践!!!
public class LockCas {
    private AtomicInteger state = new AtomicInteger(0);
    
    public void lock() {
        while (true) {
            if (state.compareAndSet(0, 1)) {
                break;
            }
        }
    }
    
    public void unlock() {
        log.debug("unlock...");
        state.set(0);
    }
}

测试

LockCas lock = new LockCas();

new Thread(() -> {
    log.debug("begin...");
    lock.lock();
    try {
        log.debug("lock...");
        sleep(1);
    } finally {
        lock.unlock();
    }
}).start();

new Thread(() -> {
    log.debug("begin...");
    lock.lock();
    try {
        log.debug("lock...");
    } finally {
        lock.unlock();
    }
}).start();

输出

18:27:07.198 c.Test42 [Thread-0] - begin... 
18:27:07.202 c.Test42 [Thread-0] - lock... 
18:27:07.198 c.Test42 [Thread-1] - begin... 
18:27:08.204 c.Test42 [Thread-0] - unlock... 
18:27:08.204 c.Test42 [Thread-1] - lock... 
18:27:08.204 c.Test42 [Thread-1] - unlock...

原理之伪共享

Cell需要防止防止缓存行伪共享问题

其中 Cell 即为累加单元

// 防止缓存行伪共享  
// Contended: v.(尤指在争论中)声称,主张,认为;竞争;争夺 ,contend的过去分词和过去式
@sun.misc.Contended
static final class Cell {

    volatile long value;
    
    Cell(long x) { 
        value = x; 
    }

    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
// 省略不重要代码
}

关于缓存行伪共享问题

得从缓存说起

缓存与内存的速度比较

img

img

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。

而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)

缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中

CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效

img

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。

这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,
比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效 ;

同理 Core-1修改Cell[1]也会让 Core-0 的缓存行失效.

解决方法: @sun.misc.Contended

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

img

add源码

累加主要调用下面的方法

public void add(long x) {
    // as 为累加单元数组
    // b 为基础值
    // x 为累加值
    Cell[] as; long b, v; int m; Cell a;
    
    // 进入 if 的两个条件
    // 1. as 有值, 表示已经发生过竞争, 进入 if
    // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // uncontended 表示 cell 没有竞争
        boolean uncontended = true;
        if (
            // as 还没有创建
            as == null || (m = as.length - 1) < 0 ||
            // 当前线程对应的 cell 还没有
            (a = as[getProbe() & m]) == null ||
            // cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
            !(uncontended = a.cas(v = a.value, v + x))
        ) {
            // 进入 cell 数组创建、cell 创建的流程
            longAccumulate(x, null, uncontended);
        }
    }
}

add 流程图

img

longAccumulate源码

final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {
    int h;
    // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
    if ((h = getProbe()) == 0) {
        // 初始化 probe
        ThreadLocalRandom.current();
        // h 对应新的 probe 值, 用来对应 cell
        h = getProbe();
        wasUncontended = true;
    }
    // collide 为 true 表示需要扩容
    boolean collide = false; 
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // 已经有了 cells
        if ((as = cells) != null && (n = as.length) > 0) {
            // 还没有 cell
            if ((a = as[(n - 1) & h]) == null) {
                // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
                // 成功则 break, 否则继续 continue 循环
            }
            // 有竞争, 改变线程对应的 cell 来重试 cas
            else if (!wasUncontended)
                wasUncontended = true;
            // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
            else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;
            // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
            else if (n >= NCPU || cells != as)
                collide = false;
            // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
            else if (!collide)
                collide = true;
            // 加锁
            else if (cellsBusy == 0 && casCellsBusy()) {
                // 加锁成功, 扩容
                continue;
            }
            // 改变线程对应的 cell
            h = advanceProbe(h);
        }
        // 还没有 cells, 尝试给 cellsBusy 加锁
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
            // 成功则 break;
        }
        // 上两种情况失败, 尝试给 base 累加
        else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
            break;
    }
}

longAccumulate 流程图

img

img

每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)

img

sum源码

获取最终结果通过 sum 方法

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum; 
}

6.8 Unsafe

取名Unsafe并不是说该类不安全,而是因为该类直接操作内存,比较复杂,意在告诉程序员使用该类有较大风险

概述

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

public class UnsafeAccessor {
    static Unsafe unsafe;
    static {
        try { 
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error(e);
        }
    }
    static Unsafe getUnsafe() {
        return unsafe;
    }
}

Unsafe CAS 操作

unsafe.compareAndSwapXXX

import lombok.Data;
import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class TestUnsafeCAS {

    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {

//        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
//        theUnsafe.setAccessible(true);
//        Unsafe unsafe = (Unsafe) theUnsafe.get(null);
        Unsafe unsafe = UnsafeAccessor.getUnsafe();
        System.out.println(unsafe);

        // 1. 获取域的偏移地址
        long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
        long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));

        Teacher t = new Teacher();
        System.out.println(t);

        // 2. 执行 cas 操作
        unsafe.compareAndSwapInt(t, idOffset, 0, 1);
        unsafe.compareAndSwapObject(t, nameOffset, null, "张三");

        // 3. 验证
        System.out.println(t);
    }
}

@Data
class Teacher {
    volatile int id;
    volatile String name;
}

输出

sun.misc.Unsafe@77556fd
Teacher(id=0, name=null)
Teacher(id=1, name=张三)

模拟实现原子整数

使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现

class AtomicData {
    private volatile int data;
    static final Unsafe unsafe;
    static final long DATA_OFFSET;
    
    static {
        unsafe = UnsafeAccessor.getUnsafe();
        try {
            // data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
            DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }
    
    public AtomicData(int data) {
        this.data = data;
    }
    
    public void decrease(int amount) {
        int oldValue;
        while(true) {
            // 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
            oldValue = data;
            // cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
            if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
                return;
            }
        }
    }
    
    public int getData() {
        return data;
    }
}

Account 实现

Account.demo(new Account() {
    AtomicData atomicData = new AtomicData(10000);
    
    @Override
    public Integer getBalance() {
        return atomicData.getData();
    }
    
    @Override
    public void withdraw(Integer amount) {
        atomicData.decrease(amount);
    }
    
});

6.9 本章小结

  • CAS 与 volatile

  • API

    • 原子整数
    • 原子引用
    • 原子数组
    • 字段更新器
    • 原子累加器
  • Unsafe

  • 原理方面

    • LongAdder 源码
      性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
      DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField(“data”));
      } catch (NoSuchFieldException e) {
      throw new Error(e);
      }
      }

      public AtomicData(int data) {
      this.data = data;
      }

      public void decrease(int amount) {
      int oldValue;
      while(true) {
      // 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
      oldValue = data;
      // cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
      if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
      return;
      }
      }
      }

      public int getData() {
      return data;
      }
      }


Account 实现

```java
Account.demo(new Account() {
    AtomicData atomicData = new AtomicData(10000);
    
    @Override
    public Integer getBalance() {
        return atomicData.getData();
    }
    
    @Override
    public void withdraw(Integer amount) {
        atomicData.decrease(amount);
    }
    
});

6.9 本章小结

  • CAS 与 volatile

  • API

    • 原子整数
    • 原子引用
    • 原子数组
    • 字段更新器
    • 原子累加器
  • Unsafe

  • 原理方面

    • LongAdder 源码
    • 伪共享

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/407103.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[已解决]npm淘宝镜像最新官方指引(2023.08.31)

最新的配置淘宝镜像的淘宝官方提供的方法 npm config set registry https://registry.npmmirror.com原来的 registry.npm.taobao.org 已替换为 registry.npmmirror.com &#xff0c;当点击 registry.npm.taobao.org 会默认跳转到 registry.npmmirror.com 如果你想将npm的下载…

【接口加密】接口加密的未来发展与应用场景

目录 3.1 接口加密与区块链技术的结合 3.1.1 区块链技术的安全特性与优势 3.1.2 接口加密在区块链中的应用案例 3.2 接口加密与物联网安全 3.2.1 物联网安全的挑战与需求 3.2.2 接口加密在物联网领域的实际应用 3.3 接口加密在金融与电子商务领域的应用 随着信息技术的不…

【前端素材】推荐优质后台管理系统Airmin平台模板(附源码)

一、需求分析 系统定义 后台管理系统是一种用于管理和监控网站、应用程序或系统的在线工具。它通常是通过网页界面进行访问和操作&#xff0c;用于管理网站内容、用户权限、数据分析等。后台管理系统是网站或应用程序的控制中心&#xff0c;管理员可以通过后台系统进行各种管…

一些不得不知道的概念!吴恩达deeplearning.ai:人工智能的导论

文章目录 强人工智能 AGI人工智能的分类深度学习AGI可能实现的一些证据一种学习算法的假设具体的例子 为什么人工智能如此高效 以下内容有任何不理解可以翻看我之前的博客哦 强人工智能 AGI 强人工智能也叫做通用人工智能&#xff0c;是人工智能学科发展的一个重要目标。 人…

spring框架介绍

spring 1.优点 1&#xff09;针对接口编程&#xff0c;解耦合 2&#xff09;aop&#xff1a;变向切面编程&#xff0c;动态增加功能 3&#xff09;方便集成框架&#xff0c;mybatis,hibernate,strust等 4&#xff09;降低j2ee接口的使用难度 2.spring是干什么的 管理bean及bean…

SPI总线结构和原理

一、概述 SPI&#xff08;Serial Peripheral Interface&#xff09;是一种同步串行通信接口标准&#xff0c;被广泛应用于各种微控制器和外设之间的通信。SPI总线结构简单、易于扩展&#xff0c;并且支持多主设备同时操作。 二、信号线 SCK&#xff08;Serial Clock&#xf…

【C++】模板初阶 | 泛型编程 | 函数模板 | 类模板

目录 1. 泛型编程 2. 函数模板 2.1 函数模板概念 2.2 函数模板格式 2.3 函数模板的原理 2.4 函数模板的实例化 2.5 模板参数的匹配原则 3. 类模板 3.1 类模板的定义格式 3.2 类模板的实例化 【本节目标】 1. 泛型编程 2. 函数模板 3. 类模板 1. 泛型编程 如何实现一…

C语言-指针详解速成

1.指针是什么 C语言指针是一种特殊的变量&#xff0c;用于存储内存地址。它可以指向其他变量或者其他数据结构&#xff0c;通过指针可以直接访问或修改存储在指定地址的值。指针可以帮助我们在程序中动态地分配和释放内存&#xff0c;以及进行复杂的数据操作。在C语言中&#…

三分钟快速搭建家纺行业小程序商城:轻松实现电子商务梦想

随着互联网的普及和移动设备的广泛使用&#xff0c;越来越多的商业活动正在向数字化转型。在这个过程中&#xff0c;小程序商城作为一种新型的电子商务模式&#xff0c;正逐渐受到商家的青睐。本文将通过具体步骤&#xff0c;指导读者如何开发一个纺织辅料小程序商城。 一、选择…

专注力训练游戏-第15届蓝桥第4次STEMA测评Scratch真题精选

[导读]&#xff1a;超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成&#xff0c;后续会不定期解读蓝桥杯真题&#xff0c;这是Scratch蓝桥杯真题解析第171讲。 第15届蓝桥杯第4次STEMA测评已于2024年1月28日落下帷幕&#xff0c;编程题一共有6题&#xff0c;分别如下&a…

渗透工具——kali中cewl简介

一、什么是cewl cewl是一个ruby应用&#xff0c;爬行指定url的指定深度。也可以跟一个外部链接&#xff0c;结果会返回一个单词列表&#xff0c;这个列表可以扔到wpscan等密码爆破工具里进行密码破解。 cewl工具爬取目标网站信息&#xff0c;生成相对应的字典 二、cewl的简单使…

鼠标右键助手专业版 MouseBoost PRO for Mac v3.3.6中文破解

MouseBoost Pro mac版是一款简单实用的鼠标右键助手专业版&#xff0c;MouseBoost Pro for Mac只要轻点你的鼠标右键&#xff0c;就可以激活你想要的各种功能&#xff0c;让你的工作效率大幅度提高&#xff0c;非常好用。 软件下载&#xff1a;MouseBoost PRO for Mac v3.3.6中…

2024年度中国5G随身WiFi品牌排行榜

【中国品牌网中国3C质量评测中心权威榜单联合发布】 第一名&#xff1a;格行 优势&#xff1a;作为随身WiFi行业的佼佼者&#xff0c;格行凭借其15年的物联网行业经验&#xff0c;在技术研发、产品创新及客户服务上均享有盛誉。其5G随身WiFi产品网络稳定&#xff0c;客户满意度…

vue2+element医院安全(不良)事件报告管理系统源代码

目录 安全不良事件类型 源码技术栈 医院安全&#xff08;不良&#xff09;事件报告管理系统采用无责的、自愿的填报不良事件方式&#xff0c;有效地减轻医护人员的思想压力&#xff0c;实现以事件为主要对象&#xff0c;可以自动、及时、实际地反应医院的安全、不良、近失事件…

redis的搭建 RabbitMq搭建 Elasticsearch 搭建

官网 Download | Redis wget https://github.com/redis/redis/archive/7.2.4.tar.gz 编译安装 yum install gcc g tar -zxvf redis-7.2.4.tar.gz -C /usr/localcd /usr/local/redis make && make install 常见报错 zmalloc.h:50:10: fatal error: jemalloc/jemal…

300分钟吃透分布式缓存-13讲:如何完整学习MC协议及优化client访问?

协议分析 异常错误响应 接下来&#xff0c;我们来完整学习 Mc 协议。在学习 Mc 协议之前&#xff0c;首先来看看 Mc 处理协议指令&#xff0c;如果发现异常&#xff0c;如何进行异常错误响应的。Mc 在处理所有 client 端指令时&#xff0c;如果遇到错误&#xff0c;就会返回 …

探索无限:Sora与AI视频模型的技术革命 - 开创未来视觉艺术的新篇章

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua&#xff0c;在这里我会分享我的知识和经验。&#x…

【C++初阶】--类和对象(下)

目录 一.const成员 1.权限放大问题 2.权限的缩小 二.再谈构造函数 1.构造函数体赋值 2.初始化列表 (1)概念 (2)使用 ①在对象实例化过程中&#xff0c;成员变量先依次进行初始化 ②再进行函数体内二次赋值 3.explicit关键字 (1)C为什么要存在自动隐式类型转换…

泰山众筹:掀起一场全民参与的购物狂潮!

随着互联网的快速发展&#xff0c;传统的商业模式已经无法满足消费者的多元化需求。在这个数字化时代&#xff0c;泰山众筹模式以其独特的魅力&#xff0c;正迅速成为新零售市场的热门话题。它不仅为消费者带来了前所未有的购物体验&#xff0c;还为企业的发展注入了新的活力。…

Visual Sudio 2022 引入第三方库(MySQL.H)

参考博客 Visual Studio 2022 C配置第三方库(libsndfile)、 fatal error LNK1107: 文件无效或损坏: 无法在 0x2C8 处读取 &#x1f33b;&#x1f33b;&#x1f33b;感谢两位博主在配置第三方库时给我提供的帮助&#x1f33b;&#x1f33b;&#x1f33b; 目录 一、准备好第三方库…