侧边栏壁纸
  • 累计撰写 98 篇文章
  • 累计创建 20 个标签
  • 累计收到 3 条评论

AQS及其常用同步器原理学习分享

林贤钦
2021-07-21 / 0 评论 / 0 点赞 / 713 阅读 / 15,874 字
温馨提示:
本文最后更新于 2022-05-05,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

AQS及其常用同步器原理学习分享

AQS(AbstractQueuedSynchronizer)是一个用于构建锁和同步器的框架,许多的同步器都可以通过AQS容易并且高效的构造出来。常用的有ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock和FutureTask。

1、AQS解决了什么问题?

2、AQS是如何解决的?

1、AQS解决了什么问题?

AQS解决了在实现同步器时涉及的大量细节,通过原子状态的管理,等待线程采用FIFO队列操作顺序。主要有以下三个维度:

  • 状态的原子性管理
  • 队列的管理
  • 线程的阻塞与解除阻塞

2、AQS是如何解决的?

2.1、状态的原子性管理

AQS维护了一个volatile int state(代表共享资源),state的访问方式有三种

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

  • ReentrantLock:state初始化为0,表示未锁定状态。A线程lock()时,state=1,重入锁过程累加,阻塞未拿到锁state=-1
  • CountDownLatch:任务分为N个子线程去执行,state也初始化为N,这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作

一般来说,自定义同步器要么是独占方法,要么是共享方式,但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

2.2、队列的管理

AQS维护了一个FIFO线程等待双向队列(CLH锁),当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒,使其再次尝试获取同步状态。

保证了在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,提高吞吐量。

image-20220505193507475

static final class Node {
    /** 标识当前节点在共享模式 */
    static final Node SHARED = new Node();
    /** 标识当前节点在独占模式 */
    static final Node EXCLUSIVE = null;
    
    /** 表示当前的线程被取消 */
    static final int CANCELLED =  1;
    /** 表示当前节点的后继节点包含的线程需要运行,也就是unpark*/
    static final int SIGNAL    = -1;
    /** 表示当前节点在等待condition,也就是在condition队列中*/
    static final int CONDITION = -2;
    /**值为-3,表示当前场景下后续的acquireShared能够得以执行*/
    static final int PROPAGATE = -3;

    /**表当前节点的状态值,取值为上面的四个常量*/
    volatile int waitStatus;

    /** 前驱节点*/
    volatile Node prev;

    /**后驱节点*/
    volatile Node next;

    /**节点线程*/
    volatile Thread thread;

    /**连接条件队列*/
    Node nextWaiter;
    
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**获取前驱节点 */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    /**通过addWaiter构造 */
    Node(Thread thread, Node mode) {    
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
	/**通过Condition构造 */
    Node(Thread thread, int waitStatus) { 
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

2.3、线程的阻塞与解除阻塞

新的线程在获取同步状态失败后,会加入到CLH队列中,并且该节点会以自旋的方式不断的获取同步状态。

加入CLH队列源码

/**
* node:新加入的CLH的节点
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取前驱节点
            final Node p = node.predecessor();
            //如果前驱节点是head节点,就尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                setHead(node);// 获取成功,将当前节点设置为head节点
                p.next = null; // 原head节点出队,在某个时间点被GC回收
                failed = false;//获取成功
                return interrupted; //返回是否被中断过
            }
            //线程在获取同步状态失败后,并不是立马进入等待状态,而是需要判断当前线程是否需要被阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

需要判断当前线程是否需要被阻塞,规则:

  1. 如果当前节点的前驱节点的等待状态为SIGNAL,则返回true
  2. 如果当前节点的前驱节点的等待状态为CALCLE,则表示该线程的前驱节点已经被中断或者超时,需要从CHL中删除,直到回溯到ws <= 0,返回false
  3. 若果当前节点的前驱节点的等待状态为非SIGNAL,非CANCLE,则以CAS的方式设置其前驱节点为的状态为SIGNAL,返回false.
/**
* pred:前驱节点
* node:当前节点
* return:true 表示当前线程需要等待
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱节点的等待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
         // 若果等待状态的值为SIGNAL,则返回true 表示当前线程需要等待
        return true;
    // 前驱节点状态为CANCELLED
    if (ws > 0) {
        // 前驱节点的状态>0,为CANCLE状态,表示该节点被中断或者超时,需要从CHL中移除。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //  前驱节点为 PROPAGATE或者CONDITION 将前驱节点的等待状态以CAS的方式更新为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

当 shouldParkAfterFailedAcquire(Node pred, Node node)方法返回true时,会执行 parkAndCheckInterrupt()方法,挂起当前线程

private final boolean parkAndCheckInterrupt() {
    //挂起当前线程
    LockSupport.park(this);
    return Thread.interrupted();
}

3、AQS的重要方法

列举一些比较重要的方法,后续在ReentrantLock及Semaphore源码中详细说明

  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
  • releaseShared(int arg):共享式释放同步状态;

4、AQS获取操作和释放操作的标准

首先同步器判断当前状态是否允许获得操作,如果是,则允许线程操作,否正获取操作将阻塞或失败。

/**获取操作*/
boolean acquice() throws InterruptedException{
    while (当前状态不允许获取操作){
        if(需要阻塞获取操作){
        	如果当前线程不在队列中,则将其插入队列
            阻塞当前线程
        }
        else 返回失败
    }
    可能更新同步器状态
    如果线程位于队列中,则将其移除队列
    返回成功
}
/**释放操作*/
void release(){
    更新同步器状态
    if(新的状态允许某个被阻塞的线程获取成功)
        解除队列中的某个或多个线程的阻塞状态
}

根据同步器的不同,获取操作可以时一种独占操作(ReentrantLock),也可以是一个非独占操作(Semaphore和CountDownLatch)。

ReentrantLock

ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。

重点:ReentrantLock如何利用AQS实现lock/unlock、公平锁/非公平锁、可重入锁。

ReentrantLock维护AQS的state,默认的状态是0,如果获取到锁,则变成1,表示加锁成功,如果获取锁失败,状态会变成负1,加入CHL队列等待,如果已拿到锁后继续请求加锁,状态会变成2、3、4......这就是可重入过程。非公平锁和公平锁,公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序,而非公平锁则允许线程“插队”,详细结合源码分析。

ReentrantLock提供了两个构造器,分别是

public ReentrantLock() {
    sync = new NonfairSync();
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认构造器初始化为NonfairSync对象,即非公平锁,而带参数的构造器可以指定使用公平锁和非公平锁。由lock()和unlock的源码可以看到,它们只是分别调用了sync对象的lock()和release(1)方法。

lock(NonfairSync)

final void lock() {
    //尝试获取状态
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

第一步:尝试去获取锁。如果尝试获取锁成功,方法直接返回。

首先用一个CAS操作,判断state是否是0(表示当前锁未被占用),如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。

非公平锁体现在这里,如果占用锁的线程刚释放锁,state置为0,而排队等待锁的线程还未唤醒时,新来的线程就直接抢占了该锁,那么就“插队”了

public final void acquire(int arg) {
    //tryAcquire再次尝试或者重入锁
    if (!tryAcquire(arg) &&
        //acquireQueued 加入等待CHL队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //中断
        selfInterrupt();
}

第二步:获取当前state的值,根据值判断是否再次获取锁和重入锁

//非公平锁tryAcquire(arg)的实现
final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取state变量值
    int c = getState();
    if (c == 0) { //没有线程占用锁
        if (compareAndSetState(0, acquires)) {
            //占用锁成功,设置独占线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
       
    }//当前线程已经占用该锁
    else if (current == getExclusiveOwnerThread()) { 
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新state值为新的重入次数
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;
}

非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。如果以上两点都没有成功,则获取锁失败,返回false。

第三步:入队。如果线程获取不到锁,就挂到CHL队列中。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

先分析addWaiter(Node.EXCLUSIVE), arg)

/**
 * 将新节点和当前线程关联并且入队列
 * @param mode 独占/共享
 * @return 新节点
 */
private Node addWaiter(Node mode) {
    //初始化节点,设置关联线程和模式(独占 or 共享)
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点引用
    Node pred = tail;
    // 尾节点不为空,说明队列已经初始化过
    if (pred != null) {
        node.prev = pred;
        // 设置新节点为尾节点
        if (compareAndSetTail(pred, node)) {
            //pred为新节点的前一个节点,此时没有关联,设置关联next
            pred.next = node;
            return node;
        }
    }
    // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
    enq(node);
    return node;
}

如果尾节点不为空,则将新节点以CAS设置为尾节点,如果尾节点为空,初始化队列,并入队新节点。

加入有多个线程走到了enq(node),看AQS如何解决并发问题

//初始化队列并且入队新节点
private Node enq(final Node node) {
    //开始自旋
    for (;;) {
        Node t = tail;
        if (t == null) { 
            //如果tail为空,设置head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //无论上一步成功和失败都会到这,将新节点挂到队列的最后一个
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

回归acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),在addWaiter(Node.EXCLUSIVE), arg)会返回新加入的节点。这个acquireQueued会自旋尝试获取锁

第四步:已经入队的线程,尝试获取锁,失败就挂起。

/**
 * 已经入队的线程尝试获取锁
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //标记是否成功获取锁
    try {
        boolean interrupted = false; //标记线程是否被中断过
        for (;;) {
            final Node p = node.predecessor(); //获取前驱节点
            //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 获取成功,将当前节点设置为head节点
                p.next = null; // 原head节点出队,在某个时间点被GC回收
                failed = false; //获取成功
                return interrupted; //返回是否被中断过
            }
            // 判断获取失败后是否可以挂起,若可以则挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 线程若被中断,设置interrupted为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

判断获取失败后是否可以挂起,若可以则挂起

/**
 * 判断当前线程获取锁失败之后是否需要挂起.
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驱节点的状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点状态为signal,返回true
        return true;
    // 前驱节点状态为CANCELLED
    if (ws > 0) {
        // 从队尾向前寻找第一个状态不为CANCELLED的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点的状态设置为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  
/**
 * 挂起当前线程,返回线程中断状态并重置
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

unlock()

尝试释放锁,如果成功判断等待节点是否需要唤醒

public void unlock() {
    sync.release(1);
}
  
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;

tryRelease的执行过程

/**
 * 释放当前线程占用的锁
 * @param releases
 * @return 是否释放成功
 */
protected final boolean tryRelease(int releases) {
    // 计算释放后state值
    int c = getState() - releases;
    // 如果不是当前线程占用锁,那么抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 锁被重入次数为0,表示释放成功
        free = true;
        // 清空独占线程
        setExclusiveOwnerThread(null);
    }
    // 更新state值
    setState(c);
    return free;
}

tryRelease的过程为:当前释放锁的线程若不持有锁,则抛出异常。若持有锁,计算释放后的state值是否为0,若为0表示锁已经被成功释放,并且则清空独占线程,最后更新state值,返回free

lock(FairSync)

公平锁和非公平锁不同之处在于,公平锁在获取锁的时候,不会先去检查state状态,而是直接执行aqcuire(1)

final void lock() {
    acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

在tryAcquire中,如果获取到状态为0,公平锁会先判断是否存在CHL队列,如果存在会进入CHL中等待

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //公平锁会先判断是否存在CHL队列,如果存在会进入CHL中等待
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

判断是否存在CLH队列

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    //如果头尾不相同,判断哨兵节点的下个节点是不是当前线程
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

tryLock(超时机制)

ReetrantLock的tryLock(long timeout, TimeUnit unit) 提供了超时获取锁的功能。

在指定的时间内如果获取到锁就返回true,获取不到则返回false。这种机制避免了线程无限期的等待锁释放

以非公平锁为例.

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    //如果线程被中断了,那么直接抛出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果未中断,先尝试获取锁,获取成功就直接返回,获取失败则进入doAcquireNanos
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos 在有限的时间内去竞争锁

/**
 * 在有限的时间内去竞争锁
 * @return 是否获取成功
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 起始时间
    long lastTime = System.nanoTime();
    // 线程入队
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 又是自旋!
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱是头节点并且占用锁成功,则将当前节点变成头结点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; 
                failed = false;
                return true;
            }
            // 如果已经超时,返回false
            if (nanosTimeout <= 0)
                return false;
            // 超时时间未到,且需要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 阻塞当前线程直到超时时间到期
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 更新nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                //相应中断
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

CountDownLatch

CountDownLatch 可以使一个或多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程

这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

CountDownLatch 常用方法

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。

await();//阻塞当前线程,将当前线程加入阻塞队列。

await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,

countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

创建计时器

当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,初始化一个AQS的创建,state的默认值为4

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);//创建同步队列,并设置初始计数器值
}
Sync(int count) {
    setState(count);
}

阻塞线程

当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。

 public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
 }
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//如果计数器为0,返回1,否正返回-1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

构建阻塞队列的双向链表,挂起当前线程

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //新建节点加入阻塞队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //获得当前节点pre节点
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);//如果计数器为0,返回1,否正返回-1
                if (r >= 0) {//计数器不为0的时候
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //重组双向链表,清空无效节点,挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

计数器递减

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。

public void countDown() {
    //递减锁重入次数,当state=0时唤醒所有阻塞线程
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    //递减锁的重入次数
    if (tryReleaseShared(arg)) {
        doReleaseShared();//唤醒队列所有阻塞的节点
        return true;
    }
    return false;
}

递减锁的重入次数

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

唤醒队列所有阻塞的节点

 private void doReleaseShared() {
     //唤醒所有阻塞队列里面的线程
     for (;;) {
         Node h = head;
         if (h != null && h != tail) {
             int ws = h.waitStatus;
             if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态
                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始
                     continue;
                 unparkSuccessor(h);//成功则唤醒线程
             }
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                 continue;                // loop on failed CAS
         }
         if (h == head)                   // loop if head changed
             break;
     }
 }

Semaphore

Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

Semaphore常用方法说明

//获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
acquire()  
//获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
acquire(int permits)  
//获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
acquireUninterruptibly() 
//尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
tryAcquire()
//尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit)
//释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
release()
//等待队列里是否还存在等待线程。
hasQueuedThreads()
//获取等待队列里阻塞的线程数。
getQueueLength()
//清空令牌把可用令牌数置为0,返回清空令牌的数量
drainPermits()
//返回可用的令牌数量。
availablePermits()

创建信号量

当我们调用new Semaphore(2) 方法时,默认会创建一个非公平的锁的同步阻塞队列,把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量

Semaphore semaphore=new Semaphore(2);

获取令牌

1、当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子的操作去修改同步队列的state ,获取一个令牌则修改为state=state-1。

2、 当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程。

3、当计算出来的state>=0,则代表获取令牌成功。

semaphore.acquire();
/**
 *  获取1个令牌
 */
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

共享模式下获取令牌,获取成功则返回,失败则加入阻塞队列,挂起线程

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试获取令牌,arg为获取令牌个数,当可用令牌数减当前令牌数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

非公平tryAcquireShared的实现,抢占式获取状态

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

构建阻塞队列的双向链表,挂起当前线程doAcquireSharedInterruptibly

释放令牌

1、线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state修改为state=state+1的过程

2、释放令牌成功之后,同时会唤醒同步队列中的一个线程。

3、被唤醒的节点会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程。

semaphore.release();
 /**
  * 释放令牌
  */
public void release() {
    sync.releaseShared(1);
}

释放共享锁,同时会唤醒同步队列中的一个线程。

/**
 *释放共享锁,同时会唤醒同步队列中的一个线程。
 * @param arg
 * @return
 */
public final boolean releaseShared(int arg) {
    //释放共享锁
    if (tryReleaseShared(arg)) {
        //唤醒所有共享节点线程
        doReleaseShared();
        return true;
    }
    return false;
}

唤醒同步队列中的一个线程

/**
 * 唤醒同步队列中的一个线程
 */
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {//是否需要唤醒后继节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始0
                    continue;
                unparkSuccessor(h);//唤醒h.nex节点线程
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE));
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
0

评论区