目录
介绍AQS:
直观方式解释加锁的流程:
Node是什么:它里面有什么属性呢
图解队列的排队过程:
源码分析三种加锁流程:
我们先讲解一下非公平锁的加锁流程:
Lock()方式加锁:
在源码里对于Lock()的解释:
源码详解枷锁的过程:
进入lock()方法后显示
进入sync.lock();方法:
展示非公平锁的实现lock():
展示公平锁实现的lock()
产看方法acquire(1);
查看方法tryAcquire(int arg):
非公平锁实现:
公平锁实现:
查看创建Node的方法addWaiter(Node.EXCLUSIVE)
查看方法acquireQueued();
查看方法shouldParkAfterFailedAcquire()
注:Node()状态
tryLock()方式加锁:
tryLock()方式:
tryLock(long time, TimeUnit unit)方式
tryAcquireNanos()方法
doAcquireNanos():
cancelAcquire()
lockInterruptibly()方式
acquireInterruptibly()方法
doAcquireInterruptibly()方法:
在我们打开源码时,可以看一下这个源码的作者:
/* * @since 1.5 * @author Doug Lea */
Doug Lea 是 Java 并发编程领域的权威人物,他在并发编程方面的贡献尤为突出。Doug Lea 是 SUNY Oswego 的计算机科学教授,并且是纽约先进技术中心软件工程实验室的联合主任。他的工作对 Java 并发编程产生了深远的影响。
Doug Lea 撰写了《JAVA并发编程实践》一书,这本书是 Java 领域的经典之作,全面深入地探讨了 Java 平台上的多线程和并发编程技术。这本书不仅帮助开发者理解和掌握如何在 Java 环境中编写高效、安全且可维护的并发代码,还提供了许多标准设计技术,用于解决常见的并发编程挑战。
Doug Lea 在 Java 并发编程中的另一个重要贡献是开发了 java.util.concurrent 包,这个包在 JDK 5.0 中被整合,极大地丰富了 Java 的并发编程工具。这个包包括了线程池、并发集合、原子变量和锁等工具,极大地简化了并发编程的复杂性。
介绍AQS:
全程为AbstractQueuedSynchronizer
AQS中文被称为队列同步器
AQS是一个抽象的接口。他是JUC包下的基类。JUC包下的很多功能都是基于AQS实现的。
AQS里面维护了一个由volitile修饰的state变量和内置的FIFO队列完成线程的排队和加锁操作。
volitile维护的state保证了可见性和有序性,在设置state是,里面提供了compareAndSetState()方法。CAS方式保证了原子性
直观方式解释加锁的流程:
为大家图解什么是FIFO队列,队列里的数据类型是什么。是如何连接的
-
FIFO队列是双向链表实现的。
-
里面每个节点的数据类型是Node
-
特殊的是,队列的头节点是一个伪节点,这个节点的线程信息是null
Node是什么:它里面有什么属性呢
Node是AQS的内部类。
-
为大家展示Node的结构:
-
里面记录着状态,前置的指针,后置的指针,还记录了队列的头节点和尾节点
图解队列的排队过程:
这个图片只是为了展现这个队列的状态,对于流程我们并没直接的展现,但是需要注意的是伪节点的Thread是null.
源码分析三种加锁流程:
我们先讲解一下非公平锁的加锁流程:
-
如果线程A执行CAS将state修改为1,则线程1得到所资源,执行业务代码。
-
这时线程B在去获取所资源是发现state不是0,并且自己并不是有锁的线程,那么B将自己封装为Node,进入双向队列。
-
但是在进入队列是,头节点必须是一个伪节点,若一开始没有伪节点,那就创建之后,将这个节点挂在伪节点后,将前置节点的状态修改为-1 之后就可以挂起线程
我们在创建ReentrantLock()时都是借助多态创建.在此为大家展示一下lock的API
Lock()方式加锁:
在源码里对于Lock()的解释:
Acquires the lock. If the lock is not available then the current thread becomes disabled for thread scheduling purposes and lies dormant until the lock has been acquired. Implementation Considerations A Lock implementation may be able to detect erroneous use of the lock, such as an invocation that would cause deadlock, and may throw an (unchecked) exception in such circumstances. The circumstances and the exception type must be documented by that Lock implementation. 获取锁。 如果锁不可用,则当前线程将出于线程调度目的而被禁用,并在获取锁之前处于休眠状态。 实施注意事项 实现 Lock 可能能够检测到对锁的错误使用,例如会导致死锁的调用,并可能在这种情况下引发(未经检查的)异常。该实现必须记录 Lock 情况和异常类型。
源码详解枷锁的过程:
进入lock()方法后显示
public void lock() { sync.lock(); }
进入sync.lock();方法:
abstract void lock();//发现是抽象方法。
展示非公平锁的实现lock():
final void lock() { //首先进入之后直接使用CAS方式修改,如果成功,将持有锁的线程设置为当前线程,返回 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //失败进入acquire方法 acquire(1); }
展示公平锁实现的lock()
final void lock() { //发现公平锁下的方式就是 acquire(1); acquire(1); }
产看方法acquire(1);
public final void acquire(int arg) { //尝试获取线程,如果为获取 if (!tryAcquire(arg) && //开始方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //addWaiter(Node.EXCLUSIVE)开始封装Node对象 Node.EXCLUSIVE这个指的是现在的锁的类型时互斥锁 //封装后讲数据直接开始放置到队列里面 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //将自己终端,就是在队列里面排队,等待唤醒 selfInterrupt(); }
查看方法tryAcquire(int arg):
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
非公平锁实现:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }进入方法:nonfairTryAcquire(acquires)
final boolean nonfairTryAcquire(int acquires) { //设置线程为当前线程 final Thread current = Thread.currentThread(); //获取state int c = getState(); //c==0 没有线程持有锁,抢一把 if (c == 0) { //CAS方式枪锁成功,设置一下当前线程,直接返回true if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //state不是0,但是线程时但该案当前线程,锁重入 state+1 直接返回获取锁成功 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //放置state超过int最大值 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置state的值 setState(nextc); return true; } //未成功返回false 返回的是flase之后开始将这个线程放入到FIFO队列里 return false; }
公平锁实现:
protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取state的值 int c = getState(); //如果是0的话,这里展现的是公平锁的君子风范: if (c == 0) { //如果里面没有Node或者是头节点之后没有Node等待,或者自己就是头节点之后的点那就可以尝试拿锁 if (!hasQueuedPredecessors() && //如果成功 返回true compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //state不是0,但是线程时但该案当前线程,锁重入 state+1 直接返回获取锁成功 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //放置state超过int最大值 if (nextc < 0) throw new Error("Maximum lock count exceeded"); //设置state的值 setState(nextc); return true; } //未成功返回false;返回的是flase之后开始将这个线程放入到FIFO队列里 return false; }//就是判断公平锁是否能够干活 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
查看创建Node的方法addWaiter(Node.EXCLUSIVE)
/** *就是简单的创建一个node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
查看方法acquireQueued();
//对于lock()方法里面的中断操作我们先忽略。 final boolean acquireQueued(final Node node, int arg) { //设置状态,当前未获取到锁资源 boolean failed = true; try { boolean interrupted = false; //死循环,必须完成一件事 for (;;) { //查找当前节点的前驱节点 final Node p = node.predecessor(); //开始判断,如果我的前置节点就是伪节点(注意是伪节点)直接在词使用tryAcquire()尝试获取锁资源 if (p == head && tryAcquire(arg)) { //如果成功,就设置一些头节点 setHead(node); p.next = null; // help GC //设置已经获取锁资源 failed = false; //返回的值是false. return interrupted; } //如果前置节点不是伪节点的话, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
查看方法shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //如果等于-1 证明前置节点可以唤醒当前节点 if (ws == Node.SIGNAL) return true; if (ws > 0) {//如果是>0的话就是当前前置节点无效,使用循环找到有效的节点,地将当前节点挂在有效节点的后面 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //那就是-2,-3使用ACS将状态设置为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
注:Node()状态
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
tryLock()方式加锁:
tryLock()方式:
public boolean tryLock() { //这就是非公平锁的TryAcquire()方法 return sync.nonfairTryAcquire(1); }final boolean nonfairTryAcquire(int acquires) { //设置线程为当前线程 final Thread current = Thread.currentThread(); //获取state int c = getState(); //c==0 没有线程持有锁,抢一把 if (c == 0) { //CAS方式枪锁成功,设置一下当前线程,直接返回true if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //state不是0,但是线程时但该案当前线程,锁重入 state+1 直接返回获取锁成功 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //放置state超过int最大值 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置state的值 setState(nextc); return true; } //未成功返回false 返回的是flase之后开始将这个线程放入到FIFO队列里 return false; }
tryLock(long time, TimeUnit unit)方式
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
tryAcquireNanos()方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //如果线程中断,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) //在抢一手,失败的话执行下面的方法 || doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos():
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//如果时间结束,直接返回失败
if (nanosTimeout <= 0L)
return false;
//设置结束时间
final long deadline = System.nanoTime() + nanosTimeout;
//将当前节点设置出来
final Node node = addWaiter(Node.EXCLUSIVE);
//设置时候失败获取锁
boolean failed = true;
try {
for (;;) { //这是死循环,我们注意查看代码的出口
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点是伪节点,尝试拿锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//成功直接返回
return true;
}
//计算现在还剩多长时间
nanosTimeout = deadline - System.nanoTime();
//时间结束,直接返回false
if (nanosTimeout <= 0L)
return false;
//时间还是有的,修改前置节点的状态,准备挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//时间有,但是太少了,连挂起的时间都不足,也是结束
nanosTimeout > spinForTimeoutThreshold)
//如果时间足够,开始将节点挂起 继续死循环等待结束条件
LockSupport.parkNanos(this, nanosTimeout);
//如果当前线程被其他线程中断,直接甩出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
//甩出异常也是结束死循环,但是需要将节点取消
if (failed)
cancelAcquire(node);
}
}
cancelAcquire()
private void cancelAcquire(Node node) { //如果是null 直接返回 if (node == null) return; //将当前节点的线程设置为null node.thread = null; //找到前置线程 Node pred = node.prev; //找到状态正常的前置节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 将前置线程的next设置为当前线程的next Node predNext = pred.next; // 设置状态为1,代表节点取消 node.waitStatus = Node.CANCELLED; // 如果是尾节点的话,直接就是简单的将前置节点设知为尾节点 if (node == tail && compareAndSetTail(node, pred)) { //设置前置节点的next为空 compareAndSetNext(pred, predNext, null); } else {//当前的节点不是尾节点或者是CAS操作失败 //那就是;两种情况,一种是前置节点就是伪节点 第二种是前置接点不是伪节点 int ws; if (pred != head //前置节点不是伪节点 && ((ws = pred.waitStatus) == Node.SIGNAL //前置节点状态是否为-1 || //不是-1的话,或着是其他的<0的状态,设置为-1 这样的话可以将线程唤醒 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //并且前置的线程不是null pred.thread != null) { Node next = node.next; //判断当前线程有后继节点,将后继节点关在前置节点上 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //当前节点的前置节点是伪节点 直接将线程唤醒 这是释放锁的操作,在后续的章节开始讲解 unparkSuccessor(node); } node.next = node; // help GC } }
lockInterruptibly()方式
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
acquireInterruptibly()方法
public final void acquireInterruptibly(int arg) throws InterruptedException { //判断线程是否终端 if (Thread.interrupted()) throw new InterruptedException(); //如果尝试获取所资源失败 if (!tryAcquire(arg)) //进入方法 doAcquireInterruptibly(arg); }
doAcquireInterruptibly()方法:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); //当前获取资源失败 boolean failed = true; try { for (;;) //注意出口条件 //获取当前节点的前置节点 final Node p = node.predecessor(); //如果前置节点是伪节点,尝试拿锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; //成功直接返回 return true; } //获取锁资源失败 开始挂起资源,如果挂起之后被中断的方式挂起,抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { //因为异常也是出口条件,将节点取消 if (failed) cancelAcquire(node);//还是取消节点的方法,同上 } }