转 深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理
1777 | 0 | 0
Condition的实现原理
Lock lock = new ReentrantLock();
lock.lock();
try{
//临界区......
}finally{
lock.unlock();
}
public interface Lock {
//加锁
void lock();
//解锁
void unlock();
//可中断获取锁,与lock()不同之处在于可响应中断操作,即在获
//取锁的过程中可中断,注意synchronized在获取锁时是不可中断的
void lockInterruptibly() throws InterruptedException;
//尝试非阻塞获取锁,调用该方法后立即返回结果,如果能够获取则返回true,否则返回false
boolean tryLock();
//根据传入的时间段获取锁,在指定时间内没有获取锁则返回false,如果在指定时间内当前线程未被中并断获取到锁则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//获取等待通知组件,该组件与当前锁绑定,当前线程只有获得了锁
//才能调用该组件的wait()方法,而调用后,当前线程将释放锁。
Condition newCondition();
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable{
public static ReentrantLock lock=new ReentrantLock();
public static int i=0;
@Override
public void run() {
for(int j=0;j<10000000;j++){
lock.lock();
//支持重入锁
lock.lock();
try{
i++;
}finally{
//执行两次解锁
lock.unlock();
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock tl=new ReenterLock();
Thread t1=new Thread(tl);
Thread t2=new Thread(tl);
t1.start();t2.start();
t1.join();t2.join();
//输出结果:20000000
System.out.println(i);
}
}
//查询当前线程保持此锁的次数。 int getHoldCount() //返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 protected Thread getOwner(); //返回一个 collection,它包含可能正等待获取此锁的线程,其内部维持一个队列,这点稍后会分析。 protected Collection<Thread> getQueuedThreads(); //返回正等待获取此锁的线程估计数。 int getQueueLength(); // 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。 protected Collection<Thread> getWaitingThreads(Condition condition); //返回等待与此锁相关的给定条件的线程估计数。 int getWaitQueueLength(Condition condition); // 查询给定线程是否正在等待获取此锁。 boolean hasQueuedThread(Thread thread); //查询是否有些线程正在等待获取此锁。 boolean hasQueuedThreads(); //查询是否有些线程正在等待与此锁有关的给定条件。 boolean hasWaiters(Condition condition); //如果此锁的公平设置为 true,则返回 true。 boolean isFair() //查询当前线程是否保持此锁。 boolean isHeldByCurrentThread() //查询此锁是否由任意线程保持。 boolean isLocked()
/**
* AQS抽象类
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;
//指向同步的队尾
private transient volatile Node tail;
//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile int state;
//省略其他代码......
}
static final class Node {
//共享模式
static final Node SHARED = new Node();
//独占模式
static final Node EXCLUSIVE = null;
//标识线程已处于结束状态
static final int CANCELLED = 1;
//等待被唤醒状态
static final int SIGNAL = -1;
//条件状态,
static final int CONDITION = -2;
//在共享模式中使用表示获得的同步状态会被传播
static final int PROPAGATE = -3;
//等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
volatile int waitStatus;
//同步队列中前驱结点
volatile Node prev;
//同步队列中后继结点
volatile Node next;
//请求锁的线程
volatile Thread thread;
//等待队列中的后继结点,这个与Condition有关,稍后会分析
Node nextWaiter;
//判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//.....
}
ReentrantLock内部存在3个实现类,分别是Sync、NonfairSync、FairSync,其中Sync继承自AQS实现了解锁tryRelease()方法,而NonfairSync(非公平锁)、 FairSync(公平锁)则继承自Sync,实现了获取锁的tryAcquire()方法,ReentrantLock的所有方法调用都通过间接调用AQS和Sync类及其子类来完成的。
从上述类图可以看出AQS是一个抽象类,但请注意其源码中并没一个抽象的方法,这是因为AQS只是作为一个基础组件,并不希望直接作为直接操作类对外输出,而更倾向于作为基础组件,为真正的实现类提供基础设施,如构建同步队列,控制同步状态等,事实上,从设计模式角度来看,AQS采用的模板模式的方式构建的,其内部除了提供并发操作核心方法以及同步队列操作外,还提供了一些模板方法让子类自己实现,如加锁操作以及解锁操作,为什么这么做?
这是因为AQS作为基础组件,封装的是核心并发操作,但是实现上分为两种模式,即共享模式与独占模式,而这两种模式的加锁与解锁实现方式是不一样的,但AQS只关注内部公共方法实现并不关心外部不同模式的实现,所以提供了模板方法给子类使用,也就是说实现独占锁,如ReentrantLock需要自己实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则需要实现tryAcquireShared()方法和tryReleaseShared()方法,这样做的好处是显而易见的,无论是共享模式还是独占模式,其基础的实现都是同一套组件(AQS),只不过是加锁解锁的逻辑不同罢了,更重要的是如果我们需要自定义锁的话,也变得非常简单,只需要选择不同的模式实现不同的加锁和解锁的模板方法即可,AQS提供给独占模式和共享模式的模板方法如下
//AQS中提供的主要模板方法,由子类实现。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//独占模式下获取锁的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//独占模式下解锁的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享模式下获取锁的方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享模式下解锁的方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//判断是否为持有独占锁
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
}
在了解AQS的原理概要后,下面我们就基于ReetrantLock进一步分析AQS的实现过程,这也是ReetrantLock的内部实现原理。
//默认构造,创建非公平锁NonfairSync
public ReentrantLock() {
sync = new NonfairSync();
}
//根据传入参数创建锁类型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//加锁操作
public void lock() {
sync.lock();
}
/**
* 非公平锁实现
*/
static final class NonfairSync extends Sync {
//加锁
final void lock() {
//执行CAS操作,获取同步状态
if (compareAndSetState(0, 1))
//成功则将独占锁线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//否则再次请求同步状态
acquire(1);
}
}
public final void acquire(int arg) {
//再次尝试获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//NonfairSync类
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
//nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//判断同步状态是否为0,并尝试再次获取同步状态
if (c == 0) {
//执行CAS操作
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
setState(nextc);
return true;
}
return false;
}
//省略其他代码
}
public final void acquire(int arg) {
//再次尝试获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
//将请求同步状态失败的线程封装成结点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果是第一个结点加入肯定为空,跳过。
//如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
if (pred != null) {
node.prev = pred;
//使用CAS执行尾部结点替换,尝试在尾部快速添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果第一次加入或者CAS操作没有成功执行enq入队操作
enq(node);
return node;
}
private Node enq(final Node node) {
//死循环
for (;;) {
Node t = tail;
//如果队列为null,即没有头结点
if (t == null) { // Must initialize
//创建并使用CAS设置头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {//队尾添加新结点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋,死循环
for (;;) {
//获取前驱结点
final Node p = node.predecessor();
当且仅当p为头结点才尝试获取同步状态
if (p == head && tryAcquire(arg)) {
//将node设置为头结点
setHead(node);
//清空原来头结点的引用便于GC
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//最终都没能获取同步状态,结束该线程的请求
cancelAcquire(node);
}
}
//设置为头结点
private void setHead(Node node) {
head = node;
//清空结点数据
node.thread = null;
node.prev = null;
}
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前结点的等待状态
int ws = pred.waitStatus;
//如果为等待唤醒(SIGNAL)状态则返回true
if (ws == Node.SIGNAL)
return true;
//如果ws>0 则说明是结束状态,
//遍历前驱结点直到找到没有结束状态的结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果ws小于0又不是SIGNAL状态,
//则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//将当前线程挂起
LockSupport.park(this);
//获取线程中断状态,interrupted()是判断当前中断状态,
//并非中断线程,因此可能true也可能false,并返回
return Thread.interrupted();
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛异常,中断线程的同步状态请求
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛异常,中断线程的同步状态请求
throw new InterruptedException();
//ReentrantLock类的unlock
public void unlock() {
sync.release(1);
}
//AQS类的release()方法
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继结点的线程
unparkSuccessor(h);
return true;
}
return false;
}
//ReentrantLock类中的内部类Sync实现的tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//判断状态是否为0,如果是则说明已释放同步状态
if (c == 0) {
free = true;
//设置Owner为null
setExclusiveOwnerThread(null);
}
//设置更新同步状态
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
从代码执行操作来看,这里主要作用是用unpark()唤醒同步队列中最前边未放弃线程(也就是状态为CANCELLED的线程结点s)。此时,回忆前面分析进入自旋的函数acquireQueued(),s结点的线程被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,如果p!=head也不会有影响,因为它会执行shouldParkAfterFailedAcquire(),由于s通过unparkSuccessor()操作后已是同步队列中最前边未放弃的线程结点,那么通过shouldParkAfterFailedAcquire()内部对结点状态的调整,s也必然会成为head的next结点,因此再次自旋时p==head就成立了,然后s把自己设置成head结点,表示自己已经获取到资源了,最终acquire()也返回了,这就是独占锁释放的过程。
//公平锁FairSync类中的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//注意!!这里先判断同步队列是否存在结点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法与nonfairTryAcquire(int acquires)方法唯一的不同是在使用CAS设置尝试设置state值前,调用了hasQueuedPredecessors()判断同步队列是否存在结点,如果存在必须先执行完同步队列中结点的线程,当前线程进入等待状态。这就是非公平锁与公平锁最大的区别,即公平锁在线程请求到来时先会判断同步队列是否存在结点,如果存在先执行同步队列中的结点线程,当前线程将封装成node加入同步队列等待。而非公平锁呢,当线程请求到来时,不管同步队列是否存在线程结点,直接尝试获取同步状态,获取成功直接访问共享资源,但请注意在绝大多数情况下,非公平锁才是我们理想的选择,毕竟从效率上来说非公平锁总是胜于公平锁。
public interface Condition {
/**
* 使当前线程进入等待状态直到被通知(signal)或中断
* 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒
* 当其他线程调用interrupt()方法中断当前线程
* await()相当于synchronized等待唤醒机制中的wait()方法
*/
void await() throws InterruptedException;
//当前线程进入等待状态,直到被唤醒,该方法不响应中断要求
void awaitUninterruptibly();
//调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时
//其中nanosTimeout指的等待超时时间,单位纳秒
long awaitNanos(long nanosTimeout) throws InterruptedException;
//同awaitNanos,但可以指明时间单位
boolean await(long time, TimeUnit unit) throws InterruptedException;
//调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时
//间期限(deadline),如果没到指定时间就被唤醒,返回true,其他情况返回false
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须
//获取与Condition相关联的锁,功能与notify()相同
void signal();
//唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须
//获取与Condition相关联的锁,功能与notifyAll()相同
void signalAll();
}
这里我们通过一个卖烤鸭的案例来演示多生产多消费者的案例,该场景中存在两条生产线程t1和t2,用于生产烤鸭,也存在两条消费线程t3,t4用于消费烤鸭,4条线程同时执行,需要保证只有在生产线程产生烤鸭后,消费线程才能消费,否则只能等待,直到生产线程产生烤鸭后唤醒消费线程,注意烤鸭不能重复消费。ResourceByCondition类中定义product()和consume()两个方法,分别用于生产烤鸭和消费烤鸭,并且定义ReentrantLock锁,用于控制product()和consume()的并发,由于必须在烤鸭生成完成后消费线程才能消费烤鸭,否则只能等待,因此这里定义两组Condition对象,分别是producer_con和consumer_con,前者拥有控制生产线程,后者拥有控制消费线程,这里我们使用一个标志flag来控制是否有烤鸭,当flag为true时,代表烤鸭生成完毕,生产线程必须进入等待状态同时唤醒消费线程进行消费,消费线程消费完毕后将flag设置为false,代表烤鸭消费完成,进入等待状态,同时唤醒生产线程生产烤鸭,具体代码如下
package com.zejian.concurrencys;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by zejian on 2017/7/22.
* Blog : http://blog.csdn.net/javazejian [原文地址,请尊重原创]
*/
public class ResourceByCondition {
private String name;
private int count = 1;
private boolean flag = false;
//创建一个锁对象。
Lock lock = new ReentrantLock();
//通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。
Condition producer_con = lock.newCondition();
Condition consumer_con = lock.newCondition();
/**
* 生产
* @param name
*/
public void product(String name)
{
lock.lock();
try
{
while(flag){
try{producer_con.await();}catch(InterruptedException e){}
}
this.name = name + count;
count++;
System.out.println(Thread.currentThread().getName()+"...生产者5.0..."+this.name);
flag = true;
consumer_con.signal();//直接唤醒消费线程
}
finally
{
lock.unlock();
}
}
/**
* 消费
*/
public void consume()
{
lock.lock();
try
{
while(!flag){
try{consumer_con.await();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+"...消费者.5.0......."+this.name);//消费烤鸭1
flag = false;
producer_con.signal();//直接唤醒生产线程
}
finally
{
lock.unlock();
}
}
}
package com.zejian.concurrencys;
/**
* Created by zejian on 2017/7/22.
* Blog : http://blog.csdn.net/javazejian [原文地址,请尊重原创]
*/
public class Mutil_Producer_ConsumerByCondition {
public static void main(String[] args) {
ResourceByCondition r = new ResourceByCondition();
Mutil_Producer pro = new Mutil_Producer(r);
Mutil_Consumer con = new Mutil_Consumer(r);
//生产者线程
Thread t0 = new Thread(pro);
Thread t1 = new Thread(pro);
//消费者线程
Thread t2 = new Thread(con);
Thread t3 = new Thread(con);
//启动线程
t0.start();
t1.start();
t2.start();
t3.start();
}
}
/**
* @decrition 生产者线程
*/
class Mutil_Producer implements Runnable {
private ResourceByCondition r;
Mutil_Producer(ResourceByCondition r) {
this.r = r;
}
public void run() {
while (true) {
r.product("北京烤鸭");
}
}
}
/**
* @decrition 消费者线程
*/
class Mutil_Consumer implements Runnable {
private ResourceByCondition r;
Mutil_Consumer(ResourceByCondition r) {
this.r = r;
}
public void run() {
while (true) {
r.consume();
}
}
}
public class KaoYaResource {
private String name;
private int count = 1;//烤鸭的初始数量
private boolean flag = false;//判断是否有需要线程等待的标志
/**
* 生产烤鸭
*/
public synchronized void product(String name){
while(flag){
//此时有烤鸭,等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.name=name+count;//设置烤鸭的名称
count++;
System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);
flag=true;//有烤鸭后改变标志
notifyAll();//通知消费线程可以消费了
}
/**
* 消费烤鸭
*/
public synchronized void consume(){
while(!flag){//如果没有烤鸭就等待
try{this.wait();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+"...消费者........"+this.name);//消费烤鸭1
flag = false;
notifyAll();//通知生产者生产烤鸭
}
}
public class ConditionObject implements Condition, java.io.Serializable {
//等待队列第一个等待结点
private transient Node firstWaiter;
//等待队列最后一个等待结点
private transient Node lastWaiter;
//省略其他代码.......
}
public final void await() throws InterruptedException {
//判断线程是否被中断
if (Thread.interrupted())
throw new InterruptedException();
//创建新结点加入等待队列并返回
Node node = addConditionWaiter();
//释放当前线程锁即释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断结点是否同步队列(SyncQueue)中,即是否被唤醒
while (!isOnSyncQueue(node)) {
//挂起线程
LockSupport.park(this);
//判断是否被中断唤醒,如果是退出循环。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// clean up if cancelled
if (node.nextWaiter != null)
//清理等待队列中不为CONDITION状态的结点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 判断是否为结束状态的结点并移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建新结点状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//加入等待队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
public final void signal() {
//判断是否持有独占锁,如果不是抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//唤醒等待队列第一个结点的线程
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//移除条件等待队列中的第一个结点,
//如果后继结点为null,那么说没有其他结点将尾结点也设置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果被通知节点没有进入到同步队列并且条件等待队列还有不为空的节点,则继续循环通知后续结点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//transferForSignal方法
final boolean transferForSignal(Node node) {
//尝试设置唤醒结点的waitStatus为0,即初始化状态
//如果设置失败,说明当期结点node的waitStatus已不为
//CONDITION状态,那么只能是结束状态了,因此返回false
//返回doSignal()方法中继续唤醒其他结点的线程,注意这里并
//不涉及并发问题,所以CAS操作失败只可能是预期值不为CONDITION,
//而不是多线程设置导致预期值变化,毕竟操作该方法的线程是持有锁的。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//加入同步队列并返回前驱结点p
Node p = enq(node);
int ws = p.waitStatus;
//判断前驱结点是否为结束结点(CANCELLED=1)或者在设置
//前驱节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒node结点的线程
LockSupport.unpark(node.thread);
return true;
}
注释说得很明白了,这里我们简单整体说明一下,doSignal(first)方法中做了两件事,从条件等待队列移除被唤醒的节点,然后重新维护条件等待队列的firstWaiter和lastWaiter的指向。二是将从等待队列移除的结点加入同步队列(在transferForSignal()方法中完成的),如果进入到同步队列失败并且条件等待队列还有不为空的节点,则继续循环唤醒后续其他结点的线程。
到此整个signal()的唤醒过程就很清晰了,即signal()被调用后,先判断当前线程是否持有独占锁,如果有,那么唤醒当前Condition对象中等待队列的第一个结点的线程,并从等待队列中移除该结点,移动到同步队列中,如果加入同步队列失败,那么继续循环唤醒等待队列中的其他结点的线程,如果成功加入同步队列,那么如果其前驱结点是否已结束或者设置前驱节点状态为Node.SIGNAL状态失败,则通过LockSupport.unpark()唤醒被通知节点代表的线程,到此signal()任务完成,注意被唤醒后的线程,将从前面的await()方法中的while循环中退出,因为此时该线程的结点已在同步队列中,那么while (!isOnSyncQueue(node))将不在符合循环条件,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理,流程如下图所示(注意无论是同步队列还是等待队列使用的Node数据结构都是同一个,不过是使用的内部变量不同罢了)
ok~,本篇先到这,关于AQS中的另一种模式即共享模式,下篇再详聊,欢迎继续关注。
文章已获作者授权,原文链接:http://blog.csdn.net/javazejian/article/details/75043422
0

wsr
0人已关注
领课教育 32659
10417
update 47867
5226
领课教育 18533
husheng 21221
请更新代码 41915
凯哥Java 2498
凯哥Java 2927
凯哥Java 2203