一、ReentrantReadWriteLock
让 读-读 操作可并发,写-写、写-读、读-写不可并发
(一)、基本使用
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start()
注意:
- 读锁不支持条件变量(写锁支持)
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待(读锁内不能有写锁)
- 重入时降级支持:即持有写锁的情况下去获取读锁(写锁内可以有读锁)
- 支持锁重入:即读锁内可以有读锁,写锁内可以有写锁
(二)、应用案例
向数据库中查询数据,为了提高性能,多次查询同一条信息时,只有第一次是从数据库中查询,剩下从缓存中查询,一旦修改数据库,缓存清空
1、缓存清空策略
(1)、先清缓存
A清空缓存,B查询数据库并将结果存放在缓存中,A更新数据库,以后查询缓存中的数据都是错误的
(2)、先更新数据库
A更新数据库,B从缓存中读出错误数据,A清空缓存,B只要在多做一次查询就能读出最新数据
(3)、补充
A查询数据库,B更新数据库,B清空缓存,A向缓存中存入错误数据
当然如果对操作进行加锁,可以避免以上问题
2、读写锁实现一致性缓存
class GenericCachedDao<T> {
// HashMap 作为缓存非线程安全, 需要保护
HashMap<SqlPair, T> map = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
GenericDao genericDao = new GenericDao();
public int update(String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
int rows = genericDao.update(sql, params);
map.clear();
return rows;
} finally {
lock.writeLock().unlock();
}
}
public T queryOne(Class<T> beanClass, String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加读锁, 防止其它线程对缓存更改
lock.readLock().lock();
try {
T value = map.get(key);
if (value != null) {
return value;
}
} finally {
lock.readLock().unlock();
}
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
// 为防止重复查询数据库, 二次检查
T value = map.get(key);
if (value == null) {
// 如果没有, 查询数据库
value = genericDao.queryOne(beanClass, sql, params);
map.put(key, value);
}
return value;
} finally {
lock.writeLock().unlock();
}
}
// 作为 key 保证其是不可变的
class SqlPair {
private String sql;
private Object[] params;
public SqlPair(String sql, Object[] params) {
this.sql = sql;
this.params = params;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return sql.equals(sqlPair.sql) &&
Arrays.equals(params, sqlPair.params);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(params);
return result;
}
}
}
(三)、原理
- t1线程加写锁
public void lock() { sync.acquire(1); }
- 进入acquire(1)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) selfInterrupt(); }
- 进入
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //w为c的低16位,即写锁状态 if (c != 0) { //整个if用作锁重入 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
- 先进入if(c != 0),现在没有加锁,c=0,进入 if (writerShouldBlock() ||!compareAndSetState(c, c + acquires)) writerShouldBlock()如果是非公平锁,返回false。!compareAndSetState(c, c + acquires))尝试改变锁的状态,如果成功,退出if,将拥有锁的线程改为当前线程,返回true
- 上面if(c != 0)中,
if (w == 0 || current != getExclusiveOwnerThread()) return false;
如果w == 0,则证明已经加了读锁,读写互斥,直接返回false。如果w !=0,则已经加了写锁,判断拥有锁的线程是否是当前线程,如果不是当前线程,进入if内返回false,如果是当前线程,执行下面代码
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true;
如果锁重入次数超过了16位,抛出异常。 否则锁的状态加1,返回true
- t2线程加写锁
public void lock() { sync.acquireShared(1); }
进入
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
进入tryAcquireShared(arg)
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
简单理解:
tryAcquireShared 返回值表示 -1 表示失败 0 表示成功,但后继节点不会继续唤醒 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
- 先进入if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current) return -1;现在t1已经加了写锁,exclusiveCount(c) != 0,拥有锁的线程不是当前线程,直接返回-1
- 返回
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
进入doAcquireShared
private void doAcquireShared(int arg) { final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final AbstractQueuedSynchronizer.Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
流程与ReentrantLock相似ReentrantLock 原理-CSDN博客https://blog.csdn.net/m0_71534259/article/details/137058374?spm=1001.2014.3001.5501,最终park阻塞,
- t1写锁 w.unluck 写锁释放
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { AbstractQueuedSynchronizer.Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
- 进入
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
锁状态-1,如果当前锁状态 !=0,表示进行了锁重入,直接返回false,如果锁状态==0,将当前拥有锁的线程设置为null,返回true。 回到
public final boolean release(int arg) { if (tryRelease(arg)) { AbstractQueuedSynchronizer.Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
假设解锁成功执行unparkSuccessor(h),唤醒头节点,头节点进入
private void doAcquireShared(int arg) { final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final AbstractQueuedSynchronizer.Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //从此处唤醒,再次进行循环 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
进入到int r = tryAcquireShared(arg)
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && //此时已经没有写锁了,exclusiveCount(c) == 0 退出if getExclusiveOwnerThread() != current) //锁已经加了写锁,且不是当前线程加的(无法锁重入),返回-1 return -1; int r = sharedCount(c); //r为c的高16位,是读锁状态位 if (!readerShouldBlock() && //读锁是否应该被阻塞,假设返回false r < MAX_COUNT && //读锁重入是否小于最大值 返回true compareAndSetState(c, c + SHARED_UNIT)) { //读锁状态+1(c + 65535) if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; //先跳过上面,返回1,表示加锁成功 } return fullTryAcquireShared(current); }
返回循环执行
private void doAcquireShared(int arg) { final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final AbstractQueuedSynchronizer.Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //从此处唤醒,再次进行循环 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- 进入setHeadAndPropagate(node, r)
private void setHeadAndPropagate(AbstractQueuedSynchronizer.Node node, int propagate) { AbstractQueuedSynchronizer.Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //先不看这些,假设进入if AbstractQueuedSynchronizer.Node s = node.next; s是位于第二位的节点(t3) if (s == null || s.isShared()) //s是shared的 doReleaseShared(); } }
进入方法doReleaseShared()
private void doReleaseShared() { for (;;) { AbstractQueuedSynchronizer.Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == AbstractQueuedSynchronizer.Node.SIGNAL) { if (!compareAndSetWaitStatus(h, AbstractQueuedSynchronizer.Node.SIGNAL, 0)) //将头节点设置为0,防止其他线程重复唤醒 continue; // loop to recheck cases unparkSuccessor(h); //唤醒后继节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
t3线程被唤醒,重复上述流程,t3执行到if (s == null || s.isShared())
doReleaseShared(); t3的后继节点不是shared,退出if
- t2释放锁 unlock 读锁释放
public void unlock() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
进入tryReleaseShared(int unused)
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; //直接看这部分,读锁状态-1(-65535) if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; //返回状态是否为0 } }
t2线程释放后返回false,释放锁成功,t3线程释放后,运行到此处返回true返回到
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
执行doReleaseShared()
private void doReleaseShared() { for (;;) { AbstractQueuedSynchronizer.Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == AbstractQueuedSynchronizer.Node.SIGNAL) { //头节点状态是否是-1 if (!compareAndSetWaitStatus(h, AbstractQueuedSynchronizer.Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //唤醒下一个节点 t4节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
- t4线程唤醒
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final AbstractQueuedSynchronizer.Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
t4线程在次进入循环,重复获得写锁的过程
二、StampedLock
(一)、基本使用
(二)、应用案例
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}
(三)、注意
- StampedLock 不支持条件变量
- StampedLock 不支持可重入