一,概述
多线程问题本质是多个线程共同访问了同一块内存,导致该内存状态不确定而产生了一系列问题。concurrent包中提供的Lock类本质是对线程对象进行监督、排队,调度,确保lock只能有一个线程或共享线程成功返回,否则阻塞或取消或超时取消的策略。笔者将在本文对Lock类设计作一个详细的解读。
在解读过程中,首先会对Lock类实现进行代码阅读,并记录至标题二中,最后尝试从实现中抽象出架构设计,并绘制类图,时序图,活动图等,记录于标题三中。
二,实现解读
1,AbstractOwnableSynchronizer
其中的继承关系如下:
可以看到,此类是所有同步器的抽象父类,锁的粒度是线程对象,其中虽说是抽象类,但没有抽象方法,且提供的独占线程set或get方法均被final修饰,子类无法修改。
2,AbstractQueueSynchronizer
抽象同步队列,从注释可能看出,这是所有同步器类的基础实现,子类只能实现tryAcquire、tryRelease、tryAcquireShared、tryReleaseShare、isHeldExclusively方法,这些具体的实现稍后再谈。该基类中,维护了一个等待队列,可用来实现FIFO的阻塞锁或相关同步器,也提供了单个原子int来表示状态,而所谓的状态state,即是当前是否有线程占用,通过getState,setState,compareAndSetState方法来原子更新int值,其值本质是当前资源被锁所成功获取的次数,为0代表没有被线程独占,否则需根据策略去获取资源,可能是自旋或其他方式,仍稍后再谈。最后,核心提供了acquire方法,此方法签名如下:
具体的实现稍后再谈。
(1)CLH Nodes
等待队列是“CLH”(Craig、Landin和Hagersten)锁定队列的变体。CLH锁通常用于自旋锁。相反,我们通过包括显式(“prev”和“next”)链接和一个“status”字段来使用它们来阻止同步器,该字段允许节点在释放锁时向后续节点发送信号,并处理由于中断和超时而导致的取消。状态字段包括跟踪线程是否需要信号的位(使用LockSupport.unpark)。尽管有这些添加,我们仍保留大多数CLH局部属性。
如果要进入CLH lock,需要原子性地设置为新的tail节点。而对于排出CLH,则需要将head设置为下一个符合条件的waiter node,使其变为first。即同步队列采用双链表的形式实现,具体地,比如插入,或已经取消的node,则会调用cleanQueue方法显示清除。CLH的head节点是一个伪节点,因为大多数情况是不存在抢占的,因此设置head被认为是徒劳。直到发生抢占,便会构造出一个头指针和尾指针。接下来,我们看一下队列的维护实现。
双链表节点,提供next、prev、waiter、status属性,并且status、next、prev等都通过unsafe原子式地写入。其有三个内部子类实现,如下,可以理解为独占节点、共享节点以及条件节点。
以上是同步队列数据结构,接下来看下如何维护。
(2)acquire
通过对acquire方法详细阅读,总结一下:
当前线程尝试去acquire时,需判断有无node(这个可以理解为node条件,如果没有则通过share参数创建当前节点,如独占节点或共享节点),真正能让当前线程阻塞的是同步队列是否为空,如head节点是否存在。如果存在head节点,且当前是first节点,则去尝试获取一次,如果成功则将当前firs节点设置为head节点,否则自旋等待,自旋次数是等待状态下循环次数,如果没有设置等待状态且自旋次数未计算,则需计算然后进入阻塞,等待unpark唤醒此节点,重新上述流程。
(3)release
如(2)所述,WAITTING状态下的node需通过park方式进入阻塞,那么在哪被唤醒呢?答案就在release中。
很简单,由子类决定是否释放,如果是否则通过头节点去通知下一个first节点,
从头节点向后遍历,遇到合法的node后,将状态原子式的设置为非等待,然后唤醒等待节点的thread,即waiter所指。
(4)总结
由此可见,通过AQS的同步队列维护,可以实现大部分锁的基本操作,只需合理的重写AQS提供的抽象方法,即可创建出诸如condition、可重入锁、可重入读写锁等业务逻辑锁。下面,通过举例实现,如ReentrantLock的实现,来进一步熟悉AQS。
3,ReentrantLock
通过参数fair决定创建公平sync或非公平sync,重点关注Sync,我们看一下
(1)Sync
Sync直接继承AQS,并且提供两个子类,
tryLock
tryLock是Sync的直接简单实现,尝试去获取锁,如果无法获取,直接返回false,否则返回true,非阻塞。实现如下,首先获取当前AQS的state,如果是0则未被占有,然后原子式的去设置state为1,成功后设置当前独占线程,否则返回false;如果state不为0,判断是否是当前线程独占,是则判断下是否重入溢出,然后返回true。实现很简单,适用于并发极其少的情况。
这和tryRelease是配套的,看下源码如下。很简单,读者可自行理解。
lock
Sync提供了initialTryLock抽象方法,具体由子类实现,如果返回false,则调用acquire方法,参数为1,
此时来到AQS中acquire,只有子类tryAcquire返回false,进入acquire核心方法,参数
node null、shared 非共享、interruptible 中断不取消、timed 非超时方式、time OL永远阻塞等待。
在此,先往回看initialTryLock定义。
即,当返回false,代表不需要阻塞地去抢占锁,否则抢占。
而如何释放呢?看下unlock。
unlock
直接调用AQS的release方法,不再赘述。
接下来,我们看Sync的子类如何实现initialTryLock方法和tryAcquire方法来实现公平与否的吧。
(2)FairSync
尝试获取锁的实现与tryLock一致,关键在于acquire中tryAcquire的实现差异,
getState==0,Key去抢占锁,但需通过hasQueuePredecessors方法判断是否存在等待时间长于此线程的线程,如果有则不抢占,此处是实现公平锁的关键,否则仍通过cas操作去简单设置state为1,设置当前线程为独占线程结束,返回true;否则返回false,代表进入同步队列。
hasQueuePredecessors实现如下,
只需判断存在firs节点指向的first线程,且first节点线程非当前线程。由此实现公平抢占(如果同步队列存在first节点,那就就不抢占吧)
(3)NonfairSync
非公平锁实现与FairSync大同小异,唯一的区别是tryAcquire,
读到这读者应该明白了,只比公平锁少了hasQueuePredecessors判断。
接下来,我们看下基于AQS的condition如何实现的。
4,Condition
(1)介绍
condition类似于Object提供的wait和notify方法,用于阻塞/唤醒。condition接口提供了await/signal类方法。
(2)实现
实现只有ConditionObject,我们来看下。
通过AQS.newCondition方法创建condition,其是AQS的内部类,持有AQS引用,condition节点是conditionNode,我们看下定义。
提供了两个方法,block是如果未释放,则park,
isReleasable当status小于等于1或当前线程被中断,返回true。
接下来我们直接看核心方法的实现,其他方法读者可自行理解。
await
先步步分解,
1)首先判断是否中断,抛出异常
2)创建Condition节点,调用enableWait方法进行设置,我们看下,
可以看到,只有独占锁才支持condition,随后的操作是将当前线程设置到waiter中,原子地设置COND|WAITING状态,并且ConditionNode内部又维持了firstWaiter和lastWaiter指针,同样赋值,即可以多个线程Condition.await。接下来,获取到当前state,并且释放掉当前state,这样做的目的就是释放当前持有的锁,并且后续从await状态退出时,重新加锁。,即让AQS中下一个node唤醒,而await线程开始等待。
我们继续,从enableWait返回了state,进入如下while循环。canReacquire是简单判断node是否处在AQS队列中,而显然,新创建的ConditionNode不再AQS队列,进入while循环体。
判断是否中断,如果中断就取消condition状态,break;
如果处在condition状态,加入到ForkJoinPool池中,是一种阻塞实现,通过内部调用block方法和isRelease方法,判断是否返回。而每次不能返回时,显示调用block方法,即ConditionNode的block实现,即park方式。
如果不出在condition状态,通过自旋方式,直到退出while循环。看到这,猜测signal核心就是将当前ConditioNode加入AQS队列中,这里canReacquire就返回true,进而退出while体了。
接下来的部分就是调用acquire加入AQS队列中,复用node节点,此时已经清除了status。
signal
实现如上,现判断是否加独占锁,随后如果有firstWaiter,则调用doSignal方法,注意参数all为false,
正如上述笔者猜测,将first将入到AQS队列中,即实现了唤醒。但在加入之前,首先清除了COND状态,然后调用enqueue方法,我们看下实现。
可以看到,将node插入AQS队列尾部,并且当status<0是,unpark等待线程。
问题来了,status在什么时候<0呢?此处大概率不为0而直接break,在unlock时,会去清除掉加入队列的ConditionNode,这个时候,通过signalNext方法去unpark ConditionNode.Waiter,从而实现await/signal功能。
具体使用方法参考对应注释。
5,CountDownLatch
再来介绍一个简单的AQS队列的实现,倒计时。这主要是共享锁的实现例子。
(1)Sync
Sync实现如下,
可以看到,通过Sync构造方法传入count,设置到state中。
重写了tryReleaseShared方法,主要是对count--,当count==0时返回true,这时releaseShared方法中就会调用signalNext,将阻塞的Node释放出。
为什么这么用呢?核心得看下CountDownLatch实现。
(2)await
直接跟进
很简单,通过acquire加入同步队列并等待。只有当count>=0是返回,这点可以通过Sync发现。
而只有tryAcquireShared返回大于0的值时,才会unpark等待线程,如下所示。
因此,CountDownLatch需要显示调用countDown方法,才能唤醒等待线程,
内部实现releaseShare,很简单,如Sync中重新所示,当count为0时采取释放节点。