剑指JUC原理-16.读写锁

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

    • ReentrantReadWriteLock
    • 应用之缓存
      • 缓存更新策略
        • 先清缓存
      • 读写锁实现一致性缓存
    • 读写锁原理
      • t1 w.lock,t2 r.lock
      • t3 r.lock,t4 w.lock
      • t1 w.unlock
      • t2 r.unlock,t3 r.unlock
    • StampedLock

ReentrantReadWriteLock

其定义就是支持冲入的读写锁,本质上也就是基于 ReentrantLock 实现的

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。

类似于数据库中的 select … from … lock in share mode

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

class DataContainer {
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
    public Object read() {
        log.debug("获取读锁...");
        r.lock();
        try {
            log.debug("读取");
            sleep(1);
            return data;
        } finally {
            log.debug("释放读锁...");
            r.unlock();
        }
    }
    public void write() {
        log.debug("获取写锁...");
        w.lock();
        try {
            log.debug("写入");
            sleep(1);
        } finally {
            log.debug("释放写锁...");
            w.unlock();
        }
    }
}

测试 读锁-读锁 可以并发

		DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();
        new Thread(() -> {
            dataContainer.read();
        }, "t2").start();

输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响

14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁... 
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁... 
14:05:15.386 c.DataContainer [t1] - 释放读锁... 

测试 读锁-写锁 相互阻塞

		DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();
        Thread.sleep(100);
        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();

输出结果

14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁... 
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁... 
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁... 

写锁-写锁 也是相互阻塞的,这里就不测试了

注意事项

  • 读锁不支持条件变量

ReentrantReadWriteLock 中的读锁不支持条件变量,主要是因为读锁在 ReentrantReadWriteLock 中是共享的,多个线程可以同时持有读锁来访问共享资源。条件变量通常用于在多线程环境下实现线程间的协调和通信,而读锁的共享特性可能导致条件变量的信号在多个线程之间产生歧义或不确定性。

  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
		r.lock();
        try {
            // ...
            w.lock();
            try {
                // ...
            } finally{
                w.unlock();
            }
        } finally{
            r.unlock();
        }
  • 重入时降级支持:即持有写锁的情况下去获取读锁
// 下面以ReentrantReadWriteLock 的 CachedData 类来说明,这段代码主要是使用读写锁来实现对缓存数据的并发访问,以提高并发读取操作的性能。


class CachedData {
    Object data;
    // 是否有效,如果失效,需要重新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
        // 先加读锁,判断缓存是否失效,如果没有失效,那么可以直接返回即可。使用完了将读锁解开即可
        rwl.readLock().lock();
        if (!cacheValid) {
            // 如果失效了,释放读锁,然后获得写锁,重新对其进行计算
            // 获取写锁前必须释放读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                
                /*
                在上述代码中,两次检查 cacheValid 的作用是为了在获取写锁之前和获取写锁后再次确认缓存的有效性。让我来详细解释一下:

第一次检查 cacheValid 发生在首次获取写锁之前。这是为了避免出现竞态条件(race condition)的情况,即在当前线程释放读锁后,有可能其他线程已经获取了写锁并更新了缓存。如果没有进行第一次检查,当前线程获取写锁后可能会重复更新缓存,造成不必要的计算和数据更新。

第二次检查 cacheValid 发生在获取写锁之后。虽然在第一次检查时缓存无效,但在当前线程获取写锁之前,可能有其他线程已经更新了缓存并将 cacheValid 设置为有效。因此,在获取写锁后再次检查 cacheValid 可以避免重复更新缓存,确保只有一个线程更新缓存数据。

通过这样的双重检查机制,可以有效地避免多个线程同时更新缓存数据,确保在并发环境下对缓存的更新操作是正确且高效的。此外,结合读写锁的降级操作,可以使得在缓存有效时多个线程能够同时读取数据,从而提高系统的并发性能。
                
                */
                
                
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                /*
                锁的降级,写锁释放开,但是我还想同时持有它的读锁,这是为了释放开的那个瞬间,其他线程的读取权限就ok了

				加读锁的目的也是为了你在读的的时候不受其他的写的线程的干扰
                */
                
                // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock();
            }
        }
        // 自己用完数据, 释放读锁 
        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

应用之缓存

缓存更新策略

更新时,是先清缓存还是先更新数据库

先清缓存

读操作的速度是大于写操作的

在这里插入图片描述

先更新数据库

在这里插入图片描述

补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

在这里插入图片描述

这种情况的出现几率非常小

读写锁实现一致性缓存

使用读写锁实现一个简单的按需加载缓存(核心:写操作 加 写锁;读操作 加 读锁

class GenericCachedDao<T> {
    // HashMap 作为缓存非线程安全, 需要保护
    HashMap<SqlPair, T> map = new HashMap<>();

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    GenericDao genericDao = new GenericDao();
    public int update(String sql, Object... params) {
        SqlPair key = new SqlPair(sql, params);
        // 加写锁, 防止其它线程对缓存读取和更改
        lock.writeLock().lock();
        try {
            int rows = genericDao.update(sql, params);
            map.clear();
            return rows;
        } finally {
            lock.writeLock().unlock();
        }
    }
    public T queryOne(Class<T> beanClass, String sql, Object... params) {
        SqlPair key = new SqlPair(sql, params);
        // 加读锁, 防止其它线程对缓存更改
        lock.readLock().lock();
        try {
            T value = map.get(key);
            if (value != null) {
                return value;
            }
        } finally {
            lock.readLock().unlock();
        }
        // 加写锁, 防止其它线程对缓存读取和更改
        lock.writeLock().lock();
        try {
            // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
            // 为防止重复查询数据库, 再次验证
            T value = map.get(key);
            if (value == null) {
                // 如果没有, 查询数据库
                value = genericDao.queryOne(beanClass, sql, params);
                map.put(key, value);
            }
            return value;
        } finally {
            lock.writeLock().unlock();
        }
    }
    // 作为 key 保证其是不可变的
    class SqlPair {
        private String sql;
        private Object[] params;
        public SqlPair(String sql, Object[] params) {
            this.sql = sql;
            this.params = params;
        }
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return sql.equals(sqlPair.sql) &&
                    Arrays.equals(params, sqlPair.params);
        }
        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(params);
            return result;
        }
    }
}

注意

以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑:

  • 适合读多写少,如果写操作比较频繁,以上实现性能低
  • 没有考虑缓存容量
  • 没有考虑缓存过期
  • 只适合单机
  • 并发性还是低,目前只会用一把锁
  • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)

读写锁原理

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

t1 w.lock,t2 r.lock

在这里插入图片描述

其实该流程和 ReentrantLock 几乎是一样的,但是还是有一些区别的,比如state不太一样,因为state既要给读锁用,也要给写锁用,所以要将state分成两部分。

t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁
使用的是 state 的高 16 位。

其实t1肯定是能加上锁,接下来分析一下源码:

ctrl + f12 找到 writelock 里面的lock方法:

public void lock() {
   sync.acquire(1);
}

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
/*
	首先会调用tryAcquire 尝试加锁,如果成功了那么后续的代码就不执行了,如果加锁失败了,才会进入这个队列

*/
protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
    // 首先拿到整个state状态
            int c = getState();
            int w = exclusiveCount(c);
    // 如果不等于0,意味着既有可能其他线程加了读锁,也有可能是其他线程加了写锁
    // 因为高16位不等于0或者低16位不等于0都有可能导致 不等于0
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // w == 0 代表 加的读锁部分   而 往后执行代表着 可能加的写锁,但是这个写锁是不是自己加的呢?比如先加的写锁,发生了重入,又加了一次写锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 如果写锁部分 再加 1超过写锁的最大范围了 65535 2的16次方
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 可以理解为发生了可重入
                setState(c + acquires);
                return true;
            }
    // 如果能往下走,说明c是等于0的,代表别的线程都没有加锁,首先判断写锁是否需要阻塞,其实就意味着公平非公平,如果是非公平锁 就 总会返回false,公平锁会检查这个队列。然后就接着往后面走 是否能compareAndSetState
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
    
    // 将对应线程设置成 owner
            setExclusiveOwnerThread(current);
            return true;
        }

接下来看 加 读锁的lock方法

public void lock() {
            sync.acquireShared(1);
        }

public final void acquireShared(int arg) {
    // 尝试去获取这个读锁
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写
锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒(0或者1会在后面的信号量章节介绍)
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
    // 检查写锁部分是否不为0 此时t1已经将其变为1了,去检查加写锁的是不是当前线程呢?
    // 这种情况其实就是 t2 已经加了写锁,然后又加了读锁,这里是应该成功的,因为这是锁降级的过程
    // 最终我们当前的情况来看,t2 其实就是返回-1了。
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在这里插入图片描述

返回-1,这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

在这里插入图片描述

private void doAcquireShared(int arg) {
    // 唤醒的时候,判断的逻辑稍有不同
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 死循环,去找t2 有没有前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是 head,那么说明其是 第二个节点,是有资格争抢锁的
                if (p == head) {
                    // 调用tryAcquireShared  返回-1表示失败,返回0或者1表示成功
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果返回-1,说明并没有释放锁,那么就会走到这个逻辑,和ReentrantLock逻辑一致,走park	shouldParkAfterFailedAcquire将其前驱节点设置成-1,然后 重新for循环,设置为park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在这里插入图片描述

t3 r.lock,t4 w.lock

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

在这里插入图片描述

t2 t3都是读锁,所以状态都是 SHARED,而t4是写锁,所以状态是EXCLUSIVE

t1 w.unlock

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

public void unlock() {
            sync.release(1);
        }
        
public final boolean release(int arg) {
    // 如果return true呢,就会执行后续的逻辑
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
    // 首先在原来的基础上减1
            int nextc = getState() - releases;
    // 然后去查看写锁部分是不是减成0了
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
    // 如果没有减成0 代表着这事一次锁的重入
            setState(nextc);
            return free;
        }

在这里插入图片描述

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内
parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        
                        // 此时进入到这个条件中
                        
                        // 替换头结点
                        
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 此时这里唤醒了,就可以继续运行了,然后再循环
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


// 返回-1表示失败,返回0或者1表示成功
protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
    // 返回-1的之前分析过了,就不再做分析了
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    // 获取读锁,也就是高16位
            int r = sharedCount(c);
    // 不应该被阻塞住
            if (!readerShouldBlock() &&
                // 没有超过最大基数
                r < MAX_COUNT &&
                // 对于高位来讲,不是加1,是加了65536
                compareAndSetState(c, c + SHARED_UNIT)) {
                
                // 如果都符合,代表后续读锁加锁成功了
                
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在这里插入图片描述

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

在这里插入图片描述

// 替换头结点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 拿到当前节点的下一个 
            Node s = node.next;
            // 如果节点的状态是 shared的话,
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用
doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

private void doReleaseShared() {
        
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 在这里会将节点的状态从 -1 改成 0

                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 对头结点的后继结点又要去唤醒,这就接上了 此时就唤醒t3的park
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

在这里插入图片描述

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 又再次来到了 tryAcquireShared方法,和前面的流程是一样的
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 此时t3 被唤醒了,和之前的流程是一样的
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
    // 返回-1的之前分析过了,就不再做分析了
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    // 获取读锁,也就是高16位
            int r = sharedCount(c);
    // 不应该被阻塞住
            if (!readerShouldBlock() &&
                // 没有超过最大基数
                r < MAX_COUNT &&
                // 对于高位来讲,不是加1,是加了65536
                compareAndSetState(c, c + SHARED_UNIT)) {
                
                // 如果都符合,代表后续读锁加锁成功了
                
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

在这里插入图片描述

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

在这里插入图片描述

// 替换头结点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 拿到当前节点的下一个 
            Node s = node.next;
            // 如果节点的状态是 shared的话,
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

t2 r.unlock,t3 r.unlock

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

在这里插入图片描述

public void unlock() {
            sync.releaseShared(1);
        }

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                // 获取状态,减去高位的1
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入
doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

在这里插入图片描述

private void doReleaseShared() {
        
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 本质上唤醒t4
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他
竞争,tryAcquire(1) 成功,修改头结点,流程结束

在这里插入图片描述

StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

这里对优化性能部分做了专门的解读:

之前学读写锁,其实已经看到了读读可以并发,已经很快了,但是还不够快,读读并发的时候,底层还是用cas的方式去修改它的状态,读锁的高16位去修改它的状态,它还是性能上比不上不加锁,如果希望读取的性能达到这种极致,那么就可以使用StampedLock。

加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通
过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

// 这里面没有加任何的锁
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
 	// 锁升级
}

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();
    public DataContainerStamped(int data) {
        this.data = data;
    }
    public int read(int readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        if (lock.validate(stamp)) {
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        // 锁升级 - 读锁
        log.debug("updating to read lock... {}", stamp);
        try {
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }
    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            sleep(2);
            this.data = newData;
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}

测试 读-读 可以优化

public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.read(0);
        }, "t2").start();
    }

输出结果,可以看到实际没有加读锁

15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.write(100);
        }, "t2").start();
    }

输出结果

15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513 

注意

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/123158.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JavaEE进阶3

传递数组: 当我们请求中,同一个参数有多个时,浏览器就会帮我们封装成一个数组 用逗号进行分割也是可以的(有的浏览器不能直接使用逗号,需要我们去转码) 传递集合: HTTP 状态码(不是后端自定义的) 2XX:成功 3XX:重定向 4XX:客户端错误 5XX:服务器错误 业务状态码:HTTP响应…

【NLP】特征提取: 广泛指南和 3 个操作教程 [Python、CNN、BERT]

什么是机器学习中的特征提取&#xff1f; 特征提取是数据分析和机器学习中的基本概念&#xff0c;是将原始数据转换为更适合分析或建模的格式过程中的关键步骤。特征&#xff0c;也称为变量或属性&#xff0c;是我们用来进行预测、对对象进行分类或从数据中获取见解的数据点的…

2.3.3 交换机的RSTP技术

实验2.3.3 交换机的RSTP技术 一、任务描述二、任务分析三、具体要求四、实验拓扑五、任务实施1.交换机的基本配置。2.开启交换机的STP。3.配置SW3A和SW3B上STP的优先级。将SW3A配置为根交换机&#xff0c;SW3B配置为备用根交换机。4.配置SW2A和SW2B的边缘接口 六、任务验收七、…

Azure 机器学习 - 使用Python SDK训练模型

目录 一、环境准备二、工作区限制三、什么是计算目标&#xff1f;四、本地计算机五、远程虚拟机六、Apache Spark 池七、Azure HDInsight八、Azure Batch九、Azure Databricks十、Azure Data Lake Analytics十一、Azure 容器实例十二、Kubernetes 了解如何用 SDK v1 将 Azure 计…

11-Docker-Redis集群部署

00-前言 在工作环境中&#xff0c;我们常常被要求配置多种集群&#xff0c;Redis 集群是最常见的入门需要掌握的集群配置方法&#xff0c;在之前的学习中&#xff0c;我们学习掌握了分布式存储的算法&#xff0c;本质上集群的部署就是通过分布式存储算法将数据分发部署好的不同…

windows上 Nexus 批量上传 maven依赖npm依赖

windows上 Nexus 批量上传 maven依赖/npm依赖 前言&#xff1a;windows系统上要有git环境&#xff0c;不然sh文件执行不了 1.批量上传maven依赖 设置脚本&#xff0c;把脚本放在依赖包的根目录执行&#xff0c;脚本名为upload.sh #!/bin/bash# 定义变量 while getopts &quo…

零信任安全:构建无懈可击的网络防护体系

随着网络技术的飞速发展&#xff0c;信息安全问题日益凸显&#xff0c;传统的安全防护手段已经无法满足复杂多变的安全需求。在此背景下&#xff0c;零信任安全模型逐渐受到广泛关注。本文将探讨零信任安全的概念、优势以及如何构建无懈可击的网络防护体系。 一、零信任安全概念…

Flink(三)【运行时架构】

前言 今天学习 Flink 的一些原理性的东西&#xff0c;比较偏概念&#xff0c;但是十分重要。有人觉得上来框框敲代码才能学到东西&#xff0c;那是狗屁不通的道理&#xff08;虽然我以前也这么认为&#xff09;。个人认为&#xff0c;学习 JavaEE那些框架&#xff0c;你上来就敲…

图的基本操作

基本操作 Adjacent(G,x,y):判断图G是否存在边<x,y>或(x,y)Neighbors(G,x):列出图G中与结点x邻接的边InsertVertex(G,x):在图G中插入顶点xDeleteVertex(G,x):从图G中删除顶点xAddEdge(G,x,y):若无向边(x,y)或有向边<x,y>不存在&#xff0c;则向图G中添加该边Remove…

安卓学习记录

文章目录 Text_View基础属性字体阴影跑马灯**activity_main.xml**MyTextview button Text_View ctrl右键可以看到属性 代码整理采用的快捷键是ctrlaltL(电脑qq锁定快捷键&#xff0c;退一下qq就行了) 基础属性 字体阴影 <?xml version"1.0" encoding"utf-…

网络安全深入学习第八课——代理与端口转发

文章目录 一、什么是代理二、正向代理三、反向代理四、正向和反向代理模拟复现 一、什么是代理 代理服务器英文全称是Proxy Server&#xff0c;其功能就是代理网络用户去取得网络信息。 形象的说&#xff1a;它是网络信息的中转站。在一般情况下&#xff0c;我们使用网络浏览…

PyCharm 无法登陆 Codeium 的解决方法

PyCharm 登陆 Codeium PyCharm 无法登陆 Codeium 的问题描述PyCharm 使用 token 登陆 Codeium PyCharm 无法登陆 Codeium 的问题描述 使用 PyCharm 登录 Codeium 时&#xff0c;单击 Login 无反应&#xff0c;单击侧边栏的 Codeium 图标也一直显示连接失败。 PyCharm 使用 to…

RHCE8 资料整理(五)

RHCE8 资料整理 第五篇 系统管理第18章 进程管理18.1 进程介绍18.2 查看进程18.3 向进程发送信号18.4 进程优先级 第19章 日志19.1 rsyslog的配置19.2 查看日志 第20章 网络时间服务器20.1 时间同步必要性20.2 配置时间服务器20.3 配置客户端 第21章 计划任务21.1 at21.2 cront…

基于Pytorch框架的LSTM算法(一)——单维度单步滚动预测(2)

#项目说明&#xff1a; 说明&#xff1a;1time_steps滚动预测代码 y_norm scaler.fit_transform(y.reshape(-1, 1)) y_norm torch.FloatTensor(y_norm).view(-1)# 重新预测 window_size 12 future 12 L len(y)首先对模型进行训练&#xff1b; 然后选择所有数据的后wind…

如何做游戏软件开发?怎么和软件开发公司合作?

随着科技的发展和普及&#xff0c;游戏软件已经成为人们日常生活和工作中不可或缺的一部分&#xff0c;许多企业也开始涉足游戏软件开发领域&#xff0c;希望能够打造出一款受欢迎的游戏&#xff0c;那么&#xff0c;如何做游戏软件开发?怎么和软件开发公司合作呢?下面我们来…

cmake vs2022编译opencv4.5.2 x86 版本

cmake 编译opencv 452 x86 版本 where is the source code 选项放置你的源文件 where to build the binaries 放置你生成结果的文件夹 点击按钮Configure,弹出提示&#xff0c;选择Win32&#xff0c;finishi 等待生成 列表中出现红色的选项&#xff0c;根据你的需要取消或…

Django框架简介

文章目录 Django框架介绍MVC与MVT模型MVCMTV 版本问题运行django注意事项 Django的下载与基本命令下载Django方式一&#xff1a;在命令界面使用pip安装方式二&#xff1a;使用pycharm安装 Django的基础命令命令行操作pycharm操作 Django项目命令行操作与Pycharm操作的区别应用D…

关于变电站综合自动化系统的案例应用分析-安科瑞 蒋静

摘 要&#xff1a;变电站综合自动化系统是将变电站内的二次设备经过功能的组合和优化设计&#xff0c;利用先进的计算机技术、通信技术、信号处理技术&#xff0c;实现对全变电站的主要设备和输、配电线路的自动监视、测量、控制、保护、并与上级调度通信的综合性自动化功能。 …

浏览器加get请求导致参数+变空格

场景: 参数有号。取值也取的正确数值&#xff0c;但是传到后台 号就变成了空格 在传参的时候在参数外面包一层encodeURIComponent即可 encodeURIComponent(this.chooseRow.offerId)

SDL2 简单介绍以及Windows开发环境搭建

1.简介 SDL&#xff08;Simple DirectMedia Layer&#xff09;是一个跨平台的多媒体开发库&#xff0c;使用C语言写成&#xff0c;用于实现音频、图形、输入以及窗口功能的处理。它提供了一个简单、易用的API&#xff0c;可以帮助开发者快速构建跨平台的多媒体应用程序。 SDL…