ReentrantReadWriteLock:
同时实现了共享锁和排它锁。内部有一个sync,同时实现了tryAcquire/tryReleases、tryAcquireShared/tryReleasesShared,一共四个函数,然后ReentrantReadWriteLock内部还实现了一个ReadLock和一个WriteLock,ReadLock和WriteLock都实现了lock和unlock函数,然后ReadLock和WriteLock是对同一个Sync对象的封装,不同之处在于ReadLock的lock函数调用的是Sync.tryAcquireShared,unlock调用的是tryReleasesShared,而WriteLock的lock函数调用的是Sync.tryAcquire/Sync.tryReleases。还有ReadLock重载了newCondition,直接抛异常,因为Condition会调用isHeldExclusive来判断当前线程是否拥有排它锁,而ReadLock是共享锁,这矛盾了,所以ReadLock重载了newCondition函数,然后直接抛异常,而WriteLock则可以正常newCondition,因为WriteLock是排它锁,所以可以支持condition,换句话说:只有排它锁才支持condition。
ReentrantReadWriteLock:
锁资源由两部分组成{state,Holder},state是一个int,用来记录已经获取读锁的线程数和已经获取写锁的线程数,state的32位字节分成了两部分,高16位表示已经获取读锁的线程数,低16位表示写者重入的次数,因为写锁是排它锁,也就是说只会有一个线程获取写锁,所以如果state低16位不为0就表示有人获得了写锁,然后这里就直接用低16位来记录写锁重入的个数,如果读者数不为0,则写者数必定为0,如果写者数为0则读者数必定为0,也就是说读写锁互斥。holder则是记录锁的获取信息,因为是reentrant即可冲入锁,也就是说可能出现这种情况:多个线程同时获取了读锁,然后多个线程又多次readLock.lock,所以就需要一个结构体来记录当前线程是否获得了读锁以及当前线程重入的次数,所以holder是一个threadlocal变量,每个线程都有一份,如果当前线程没有获得该锁,则删除该threadLocal,前面说了写锁重入次数直接用state低16位记录,并且用父类的owner来记录谁获得了写锁,所以写锁不用holder,holder只用于读锁。注意ReentrantLock不需要holder的原因是ReentrantLock是排它锁,最多只有一个线程能获得锁,而父类提供的state和owner变量就足够了,所以不需要holder变量。
一个线程已经获取写锁以后,可以继续获取读锁,但是反过来不行,一个线程获取了读锁后,是不允许再次获取写锁的,原因很简单,如果线程先获取了写锁,那么就能保证其他所有线程都不能获取读锁,所以线程可以安全获取读锁,因为只有他一个人能获取读写权限,而反过来如果线程先获取了读锁,那么就可能还有其他线程此时也获取了读锁,这样就不止一个线程获取了读锁,所以此时是不能获取写锁的。
import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
//同时实现了读写锁
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
//读锁对象
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁对象
private final ReentrantReadWriteLock.WriteLock writerLock;
//读锁对象和写锁对象封装了同一个sync对象
final Sync sync;
public ReentrantReadWriteLock() {
this(false);
}
//默认是非公平锁
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
//readLock和writeLock都是使用的同一个ReentrantReadWriteLock对象
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//读者数偏移,读者数=state>>SHARED_SHIFT
static final int SHARED_SHIFT = 16;
//读者数+1就直接加SHARED_UNIT就行
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//最大读者数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//写者重入次数数掩码,写者数=state&&EXCLUSIVE_MASK
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//定义的holder类,记录了当前线程id,以及加读锁的次数
//如果线程获得了读锁,就会拥有一个holdCounter对象
//如果线程没有获取读锁,就会删除holdCounter对象
//因为HoldCounter对象被用作ThreadLocal对象
static final class HoldCounter {
//记录读锁重入的次数
int count = 0;
//线程id
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
//这里解释一下readHolds/cachedHoldCounter/firstReader/firstReaderHoldCount
//这些变量都只有一个目的:如果当前线程持有读锁,则返回本线程重入次数
//readHolds是一个ThreadLocal变量,其他三个都不是ThreadLocal
//本来要返回本线程重入次数直接返回ThreadLocal内保存的count就行了
//但是ThreadLocal.get可能比较慢,所以为了优化,
//就用了两个变量firstReader/cachedHolderCount
//firstReader记录的是当没有任何获取读锁时,记录第一个获得读锁的线程的thread对象
//cachedHolderCount记录的是上次获取读锁的线程的id
//readHolds记录的是本线程的信息,是threadLocal
//然后如果有人要获取当前线程读锁的重入次数,那么获取逻辑是这样的:
//先看能不能从firstReader读,如果firstReader就是当前线程
//那么就直接返回firstReadHoldCount,就不用去读threadLocal了
//如果firstReader不是当前线程,则尝试去cachedHolderCount读
//如果上一次获取读锁的线程就是当前线程,那么ok,可以直接从cachedHolderCount读
//这样就不用去读ThreadLocal了,如果都失败了,那么就只能去读readHolds了
//而读threadLocal肯定是比较慢的。。。(花里胡哨的,不过文档说有助于提高并发效率。。。)
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState());
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
//释放写锁
protected final boolean tryRelease(int releases) {
//首先判断当前线程是否拥有锁:直接判断owner是不是当前线程,如果是则拥有
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//然后扣减写锁计数,写锁是低16位,所以可以直接减
int nextc = getState() - releases;
//判断写锁重入计数是否为0
boolean free = exclusiveCount(nextc) == 0;
if (free)
//如果是则设置owner为null
setExclusiveOwnerThread(null);
//然后设置state
//笔记:在释放写锁前,读者数必定为0
setState(nextc);
return free;
}
//获取读锁
protected final boolean tryAcquire(int acquires) {
//获取当前线程
Thread current = Thread.currentThread();
//获取锁资源状态
int c = getState();
//计算写锁重入次数个数
int w = exclusiveCount(c);
//如果c!=0,表示有人获得了读锁或者有人获得了写锁
if (c != 0) {
//c!=0,但是w=0,表明有人获得了读锁
//c!=0,并且w!=0,表示有人获得了写锁,所以还需要判断是不是自己获得了写锁
//所以这里就是如果有人获得了读锁或者获取写锁的线程不是自己,那么本次写锁获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果是自己获得了写锁,则判断可重入次数是否超过了(2^16-1)次
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果没有,则表示成功获取锁资源,返回true
//注意:此时是线程已经获得了锁,并且是自己,
// 也就是说只有本线程可以修改state,所以此时无需用cas操作
setState(c + acquires);
return true;
}
//如果c==0表示此时没有任何人获得锁
//因为ReentrantReadWrite支持公平或者非公平锁,所以writeShouldBlock是一个抽象方法
//公平方式:看aqs的sync list是否有节点等待,如果有则返回true,表示当前线程不能获取锁
//非公平方式:writeShouldBlock直接返回false,表示立即尝试获取锁
if (writerShouldBlock() ||
//如果不需要阻塞,则立即通过cas尝试获取锁
!compareAndSetState(c, c + acquires))
//如果cas获取失败就返回false表示本次获取锁失败
return false;
//获取成功就设置锁拥有者为自己
setExclusiveOwnerThread(current);
return true;
}
//释放读锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//此处到下面for循环前面的都是为了更新当前线程的读锁重入次数
//如果当前线程就是自己,那么直接更新firstReaderHoldCount
//就不用去写threadLocal了
if (firstReader == current) {
//如果重入次数为1,那么-1之后就表示释放掉了
//所以直接把firstReader设置为null
//这样,firstReader是无效的,那么firstReaderHoldCount就肯定失效了
//所以此处只设置firstReader=null,而没有扣减firstReaderHoldCount
if (firstReaderHoldCount == 1)
firstReader = null;
else
//否则读锁重入次数大于1,释放一次后,线程还是拥有读锁
//所以firstReader不用变
firstReaderHoldCount--;
} else {
//如果自己不是firstReader,那么再尝试cachedHoldCounter
//即尝试自己是不是上一次访问的线程
HoldCounter rh = cachedHoldCounter;
//如果缓存的holder为null或者缓存的holder不是当前线程的
if (rh == null || rh.tid != getThreadId(current))
//那么就表示从cachedHoldCounter读取失败
//此时就只能去读threadLocal了
rh = readHolds.get();
//此时rh为cachedHoldCounter或者readHolds
int count = rh.count;
//如果count<=1,那么释放一次后就为0
//所以此时就要移除threadLocal
//免得内存泄漏,比如一个线程获取了读锁,释放后就永不再是用这个rw锁了
if (count <= 1) {
//移除threadLocal
readHolds.remove();
if (count <= 0)
//如果count<=0就表示自己没有持有读锁,却常是释放读锁
throw unmatchedUnlockException();
}
//当前线程重入次数-1
--rh.count;
}
//更新完读锁重入次数,那么下面就是通过cas更新state也就是更新读锁总数
for (;;) {
//获取读锁总次数
int c = getState();
//-1
int nextc = c - SHARED_UNIT;
//cas设置state
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
//尝试获取读锁
//!!!在持有写锁的状态下允许获取读锁,但是反过来持有读锁时是不允许再次获取写锁
//也就是说支持锁降级(当然,这里没有降级),但是不支持锁升级
protected final int tryAcquireShared(int unused) {
//获取当前线程对象
Thread current = Thread.currentThread();
//获取锁资源状态
int c = getState();
//判断是否有线程持有写锁,如果有,则判断是不是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
//如果不是,则返回-1表剩余读锁资源不足,读锁获取失败
return -1;
//走到此处,要么无人获取写锁,要么自己获取了写锁
//先获取读者计数
int r = sharedCount(c);
//判断本次获取读锁是否应该阻塞,分公平和非公平方式
//非公平方式:如果aqs队列中第一个等待的线程是要获取写锁,那么本次读者应该等待
//也就是避免写进程长时间获取不到读锁,也就是说如果第一个等待的线程是要获取读锁,
//那么本线程会立即获取读锁,不管后面的,也就是说对读不公平,但是对写带点公平
//公平方式:如果aqs队列中有等待的进程,则本次获取read需要阻塞
// 可能是因为达到总读锁上限了,因为state用于读者锁计数的只有16位
if (!readerShouldBlock() &&
//如果没有达到总读者锁重入计数
r < MAX_COUNT &&
//那么就尝试获取读锁
compareAndSetState(c, c + SHARED_UNIT)) {
//下面就是更新线程对应的读锁重入次数了
//r==0表示本线程是第一个获得读锁的线程
if (r == 0) {
//那么就设置firstReader为本线程
firstReader = current;
//并且读锁重入次数为1
firstReaderHoldCount = 1;
//如果当前线程正好是第一个读者
} else if (firstReader == current) {
//那么直接更新firstReaderHoldCount就行
firstReaderHoldCount++;
} else {
//反之判断cachedHoldCounter是否为当前线程
HoldCounter rh = cachedHoldCounter;
//如果cached为null或者cached不是当前线程
if (rh == null || rh.tid != getThreadId(current))
//则更新cached为当前线程的readHold
cachedHoldCounter = rh = readHolds.get();
//如果cached是自己,如果count=0表示我们之前已经删除了
//那么直接设置readHolds等于我们缓存的,就不用重新创建了
else if (rh.count == 0)
readHolds.set(rh);
//更新本线程读锁重入次数
rh.count++;
}
//返回获取锁成功
return 1;
}
//如果获取锁失败或者本线程获取读锁需要阻塞,则走重逻辑获取锁
//重逻辑和上面的逻辑几乎一样
return fullTryAcquireShared(current);
}
//while cas获取读锁
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
//获取锁状态
int c = getState();
//判断是不是有人获取了写锁
if (exclusiveCount(c) != 0) {
//如果有,则再判断是不是自己
if (getExclusiveOwnerThread() != current)
//如果不是,则返回失败
return -1;
//如果本次读者需要阻塞:
//非公平方式:如果aqs队列中第一个等待的线程是要获取写锁,那么本次读者应该等待
//也就是避免写进程长时间获取不到读锁,也就是说如果第一个等待的线程是要获取读锁,
//那么本线程会立即获取读锁,不管后面的,也就是说对读不公平,但是对写带点公平
//公平方式:如果aqs队列中有等待的进程,则本次获取read需要阻塞
// 可能是因为达到总读锁上限了,因为state用于读者锁计数的只有16位
} else if (readerShouldBlock()) {
//这里又是一样的逻辑,用来更新count的,懒得写了
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//cas操作尝试获取锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
//下面又是更新count的逻辑了,和上面一样,略
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
//返回1表示读锁资源剩余1
//这里是固定返回1,就是说不管获取多少次读锁,剩余读锁资源总是1
return 1;
}
}
}
//和tryAcquire逻辑一模一样,唯一区别就是这里即使线程拥有写锁,也是通过cas更新state
//略
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
//和fullTryAcquireShared逻辑几乎一模一样,略
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
//判断当前线程是否拥有写锁
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
//写锁支持条件变量,writeLock的newCondition中调用这个函数
//注意:读锁是不支持的,readLock的newCondition函数则是直接跑UnSupport异常
final ConditionObject newCondition() {
return new ConditionObject();
}
//如果写锁已分配,获取持有写锁的线程对象
final Thread getOwner() {
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
//获取读锁总次数
final int getReadLockCount() {
return sharedCount(getState());
}
//判断是否有现成是否已经获得了写锁
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
//获取写锁重入次数,就是state低15位
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
//获取当前线程读锁重入次数
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
//先尝试从firstReader读取,
if (firstReader == current)
return firstReaderHoldCount;
//如果firstReader不是自己,则再尝试从cachedHolderCount读取
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
//如果cachedHolderCount也不是自己,那么最后才去读threadLocal,即读readHolder
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0);
}
final int getCount() { return getState(); }
}
//非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
//本次获取写锁是否该阻塞?答案是:用不阻塞
final boolean writerShouldBlock() {
return false;
}
//本次获取读锁是否应该阻塞?
//答案是如果是如果第一个等待的线程是写锁,则本次获取读锁需要阻塞
//反之如果是读锁,则不阻塞,对读锁不公平,对写锁稍显公平
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusives();
}
}
//公平方式
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
//本次获取写锁是否需要阻塞?如果等待队列不为空则需要阻塞
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
//本次获取读锁是否需要阻塞?如果等待队列不为空则需要阻塞
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
//下面就是ReadLock和WriteLock了,这两都是对上面的Sync的一个简单封装
//ReadLock和WriteLock的各种函数都是简单的转调用Sync的相关函数,
//一眼就能看明白,略
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
//!!!读锁不支持条件变量
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
//写锁支持条件变量,就是AQS的条件变量
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}