文章目录
- 1.诞生背景
- 2.LongAdder核心思想
- 3.底层实现:
- 4.额外补充
1.诞生背景
LongAdder是JDK8新增的一个原子操作类,和AtomicLong扮演者同样的角色,由于采用AtomicLong 保证多线程数据同步,高并发场景下会导致大量线程同时竞争更新一个原子变量,容易造成大量线程竞争失败后,无线循环不断自旋尝试CAS,极大浪费CPU资源。
为了解决这个循环自旋尝试CAS极大占用CPU资源的问题,JDK大佬就创造了LongAdder类
2.LongAdder核心思想
将一个变量拆分成多个变量,高并发场景下让多个线程竞争获取多个资源,用以减少竞争资源冲突,从而提升性能。
本质就是一种
分段锁
思想,将一个变量分成多段,多线程并发下获取不同分段对象cell不会发生竞争,有效避免大量线程自旋竞争CAS
3.底层实现:
下面结合LongAdder的结构
,add()
和sum()
方法对类底层执行进行剖析。
LongAdder是继承于Striped64
。其中比较重要的四个参数在下图列出。
// 继承Striped64.
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
}
// Striped64 类中四个实例变量
/** Number of CPUS, to place bound on table size */
// 当前机器CPU数目
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
// 用于存储变量的数组,初始size为2,采用2倍扩容,因为length参与了线程获取cell对象时索引计算
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
// 变量基数
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
// 用于判断是否发生cell竞争状态标识,1表示存在获取cell线程。
transient volatile int cellsBusy;
cells数组是用来存储变量值的一部分的集合。Cell结构
如下:以JDK21
为例子
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return VALUE.weakCompareAndSetRelease(this, cmp, val);
}
final void reset() {
VALUE.setVolatile(this, 0L);
}
final void reset(long identity) {
VALUE.setVolatile(this, identity);
}
final long getAndSet(long val) {
return (long)VALUE.getAndSet(this, val);
}
// VarHandle mechanics
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(Cell.class, "value", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
@jdk.internal.vm.annotation.Contended
注解是用以字节填充,用来避免伪共享
。这个伪共享在上一篇剖析AQS源码中有讲。点击查看
这个Cell
结构很简单,就是使用value变量来存储值。
接着看看sum()
方法:
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot; invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated.
*
* @return the sum
*/
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
代码逻辑很简单,就是把base的值和cells数组里的值求和,这个就是LongAdder实际值
。
继续看add()
方法,这个是整个类最关键
的方法:
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
// b为基础值, v为存储当前线程被分配到具体某个cell的value。
/**
m 当前cell的长度-1 ,
由于cell的长度是2的幂数,因此结构必然是`...1111`结尾,
index & m 就是用来计算当前线程竞争获取对象cell在cells的位置
!(uncontended = c.cas(v = c.value, v + x)
cell对象cas失败走longAccumulate(x, null, uncontended, index)逻辑。
**/
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
int index = getProbe();
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[index & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended, index);
}
}
/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*/
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
}
1.介绍add方法前,先对getProbe()方法进行简要说明,这个可以理解为根据当前线程获取一个唯一id用来计算当前线程
参与竞争Cell对象
在cells数组中索引位置
。
2.假定一个线程就是一个用户,cells中是一个窗口服务列表,cells中的每个cell实例是一个窗口,相关运行流程图如下:
3.接着来分析下longAccumulate
这个方法:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended, int index) {
if (index == 0) {
ThreadLocalRandom.current(); // force initialization
index = getProbe();
wasUncontended = true;
}
// 默认冲突为false
for (boolean collide = false;;) { // True if last slot nonempty
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {
// 当前cell对象为空
if ((c = cs[(n - 1) & index]) == null) {
// cellsBusy 为0 此时可以通过自旋竞争获取锁。
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// cas方式加锁。这个锁在创建对象和扩容时需要加锁。
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
// 二次检查。确保对象索引位置为null在执行赋值操作。
// 这个和懒加载单例模式DoubleCheck 思想一直。
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & index] == null) {
// 将新建的cell对象r赋值给指定位置。
rs[j] = r;
break;
}
} finally {
// 锁释放
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
// cas执行失败 设置true重新再执行
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 当前cell 存在 则执行CAS,如果方法fn为null则执行加法操作此时就是LongAdder
// 如果传了函数,则调用自定义函数fn。
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
// 如果当前Cell数组元素个数大于CPU数或者已经完成扩容,则冲突为false。
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
// 如果以上判断均不满足,则是存在冲突的。设置为true。
else if (!collide)
collide = true;
// 如果当前n没有到达cpu个数且存在冲突。
// 尝试扩容。cas机制扩容加锁,避免多个线程都进行扩容操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1); // 必须是2倍扩容
} finally {
// 锁释放
cellsBusy = 0;
}
// 重新设置冲突为false。
collide = false;
continue; // Retry with expanded table
}
index = advanceProbe(index);
}
// cell数组初始化判断逻辑。
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[index & 1] = new Cell(x);
cells = rs;
break;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
}
}
index ==0 说明 getProbe() 方法为0,(int)
THREAD_PROBE.get(Thread.currentThread())=0,
则说明当前线程threadLocalRandomProbe=0, 这个是通过反射
实现值获取。
再看Thread类中,关于threadLocalRandomProbe注释,说明ThreadLocalRandom 对象没有初始化,因此才需执行初始化操作
。也就是ThreadLocalRandom.current()这个方法。
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
// 初始化后不能为0。
int threadLocalRandomProbe;
由于longAccumulate 代码量太大,关于运行情况,注释都写在代码上。 相信大家都能看懂。
4.额外补充
1.LongAdder 和LongAccumulate 之间的关系,longAccumulate 是通用累计计算器,不仅可以实现累加,还可以根据用户自定义函数来实现累计功能,LongAdder 是其中一个特例,相当于就是一个longAccumulate 默认实现。
2.Cell数组的个数和CPU相等,此时性能能得到最大发挥。
3.Cell数组占用内存相对较大,一开始是null,只有在使用到Cell数组才会创建,惰性加载/懒加载方式。
4。应用场景:适用于高并发累计统计计数场景,不适用于单线程、以及多线程下实时获取精准数据的情况。
核心思想分段锁
,采用空间换时间
的策略,来提升高并发下统计计数的效率。大多数性能优化,都是空间换时间、时间换空间这两者之间做权衡
。