• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

从源码解析AQS

互联网 diligentman 6天前 13次浏览

详解AQS(AbstractQueuedSynchronizer)

一, 是什么,有什么作用?

概念: 抽象队列同步器,是Java一系列锁以及同步器的底层实现框架

作用: 实现像ReentrantLock,CountDownLatch,Semaphore这样的工具

二, 类的架构以及实现逻辑图

类架构图

从源码解析AQS

类逻辑图

从源码解析AQS

类中基本属性

    /**
     * 头节点
     */
    private transient volatile Node head;

    /**
     * 尾节点
     */
    private transient volatile Node tail;

    /**
     * 用户自定义线程状态,该状态用于各种同步器的实现,例如ReentrantLock的state就代表是否获取到资源和已重入       * 次数
     */
    private volatile int state;
    /**
     * 阻塞队列节点对象,可以看出AQS是一个FIFO的双向队列
     */
    static final class Node {
        /**
         * 标记该线程是获取共享资源时被阻塞放入AQS队列的
         */
        static final Node SHARED = new Node();
        /**
         * 标记该线程时获取独占资源时被阻塞放入AQS队列的
         */
        static final Node EXCLUSIVE = null;

        /**
         * 线程被取消了
         */
        static final int CANCELLED = 1;

        /**
         * 线程需要被唤醒
         */
        static final int SIGNAL = -1;

        /**
         * 线程在条件队列里面等待
         */
        static final int CONDITION = -2;

        /**
         * 释放共享资源时需要通知其他结点
         */
        static final int PROPAGATE = -3;

        /**
         * 记录当前线程等待状态:
         * 1, CANCELLED: 线程被取消了
         * -1, SIGNAL: 线程需要被唤醒
         * -2, CONDITION: 线程在条件队列里面等待
         * -3, PROPAGATE: 释放共享资源时需要通知其他结点
         */
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继结点
         */
        volatile Node next;

        /**
         * 用来存放进入AQS里面的线程
         */
        volatile Thread thread;

        /**
         * 下一个等待者
         */
        Node nextWaiter;

        /**
         * 当前线程是否是共享线程
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 获取前一个节点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {
        }

        /**
         * 设置线程类型:
         * 1, shared
         * 2, exclusive
         */
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }

        /**
         * 设置线程状态:
         * 1, cancelled
         * -1, signel
         * -2, condition
         * -3, propagate
         */
        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

三,共享操作和独占操作

在AQS中,对于资源的获取可以分为共享式和独占式

共享式: 该资源可以同时被多个线程所持有,但是如果超出所定义的范围就会被放入到阻塞队列中

独占式: 该资源每次只能有一个线程所持有,其他请求该资源的线程都会被加入到阻塞队列中

1, 独占式操作

获取资源: 由子类实现获取资源的逻辑,如果获取成功,直接修改state值

​ 获取失败,首先将当前线程包装为一个独占式的线程节点,然后插入到队列尾部,再使用CLH算法,不断轮询获取资源,如果成功则返回,在过程中如果线程被中断是不会响应的(除了使用Condition将当前线程放到条件队列中,然后取消当前线程的状态)

public final void acquire(int arg) {
    /**
      * 先试图获取资源,如果获取不到,则将其放入队列,并且使用LockSupport的park阻塞当前线程
      */
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //尾节点不为空,将当前节点插入到队列尾部
    if (pred != null) {
        node.prev = pred;
        //CAS将当前线程插入到队列尾部,如果成功直接返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    /**
     * 尾节点为空或者插入到尾节点失败,则使用死循环一直尝试往队列中插入节点,直至成功
     */
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (; ; ) {
        Node t = tail;
        //1, 尾节点为空
        if (t == null) {
            //2, 设置头节点(CAS),并且将头节点和尾节点置为一致
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //3, 连接当前节点和尾节点
            node.prev = t;
            //4, 将尾节点设置为t,并且再连接t和尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
/**
 * 该方法主要用来已独占不间断的方式获取队列中已存在的线程.
 * 使线程在队列中获取资源,一直获取到资源后再返回,如果在过程中被中断,则返回true,返回到上一级被阻塞,否则返回false
 * CLH锁: 是一个基于单链表的高性能,公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (; ; ) {
            /**
              * 获取当前节点的前一个节点,
              * CLH关键,一直获取当前节点的前驱节点并处于自旋状态
              */
            final Node p = node.predecessor();
            /**
              * 如果p为头节点,然后当前节点尝试去获取资源,
              * 如果获取成功,然后设置当前节点为头节点
              */
            if (p == head && tryAcquire(arg)) {
                //头节点为当前节点
                setHead(node);
                //出队
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /**
              * 根据当前线程的状态值判断当前线程是否应该被阻塞,
              * 如果为true则将当前线程阻塞并且将当前线程的终端标记位擦除
              * 再到条件块内设置中断标记(并非本线程)
              */
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        /**
          * 当前线程被中断,设置当前线程的状态为被取消
          */
        if (failed)
            cancelAcquire(node);
    }
}
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors,跳过被取消的线程
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 当前节点的后一个节点
    Node predNext = pred.next;
    //设置当前线程的状态为被取消
    node.waitStatus = Node.CANCELLED;
    // 如果当前节点为尾节点,删除掉当前节点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        /**
         * 当前节点的前一个节点不为头节点并且前一个节点的线程不为空,
         * 并且前一个节点的状态为等待唤醒或者前一个节点状态不为被取消并且设置为等待唤醒成功
         */
        int ws;
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
                             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            //当前节点的下一个节点不为空并且下一个节点的状态为等待唤醒状态,删除当前节点
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        /**
         * 找到一个当前节点之前的状态不为被取消了的线程
         * 可以为需要被唤醒,在条件队列中等待,释放资源是需要通知其他节点
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /**
         * 如果当前线程的状态不为等待唤醒,将其设置为等待唤醒
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

释放资源: 调用子类实现的方法释放资源,释放成功修改state,然后唤醒距离头节点最近的一个阻塞节点

public final boolean release(int arg) {
    /**
      * 在这一步就释放资源,并且释放后唤醒队列的第一个节点
      * 使用LockSupport的unpark()方法释放当前线程
      */
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //找出与头节点最近的一个阻塞节点,并释放
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    /**
      * 线程处于阻塞或需要被唤醒的状态
      */
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //获取当前节点的下一个节点
    Node s = node.next;
    //如果当前节点为空或者其不在阻塞队列中
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            //找到距离尾节点最远的一个阻塞的线程(距离头节点最近)
            if (t.waitStatus <= 0)
                s = t;
    }
    //找到距离头节点最近的一个阻塞节点,将其与LockSupport关联,进入等待状态
    if (s != null)
        LockSupport.unpark(s.thread);
}

2, 共享式操作

获取资源: 和独占式相似,不同之处在于,共享式会对修改后的state值进行判断,如果大于0表示还可以继续获取(即共享也不是所有人都可以获取,有一定的限量)

释放资源: 和独占式的区别在于,只有头节点处于待唤醒状态时才会释放

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
    //将当前线程加入到队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (; ; ) {
            //获取前驱节点
            final Node p = node.predecessor();
            //如果前驱节点为头节点,则继续尝试获取资源
            if (p == head) {
                int r = tryAcquireShared(arg);
                //获取资源成功,共享所在
                if (r >= 0) {
                    //设置头节点,并且唤醒其他节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    //当前线程在获取资源的过程中被中断,则设置当前线程中断标志
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //前驱节点不为头节点或者获取资源失败,需要判断当前线程是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //获取资源失败则将当前线程置为取消状态waitStatus=CANCELLED
        if (failed)
            cancelAcquire(node);
    }
}
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /**
     * 传播变量大于0或者头节点为空或者头节点处于被唤醒状态
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        /**
         * 当前节点的后继节点为空或者其是共享式节点,则
         */
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
private void doReleaseShared() {
    for (; ; ) {
        Node h = head;
        if (h != null && h != tail) {
            //获取头节点等待状态
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //设置头节点的状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒后继节点(从后继节点中找出一个状态值小于0的节点)
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

四,条件变量

1,是什么?

类似于Object类的wait和notify操作,但是更加的灵活,可以指定唤醒具体的线程

2,类结构

从源码解析AQS

3,主要方法

挂起线程: await,将当前线程节点的状态修改为CONDITION然后加入到条件队列中

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //创建新的节点,并插入到条件队列尾部
    Node node = addConditionWaiter();
    //释放当前线程获取到的锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果当前线程的状态为CONDITION则将其阻塞
    while (!isOnSyncQueue(node)) {
        //将当前线程阻塞
        LockSupport.park(this);
        /**
          * 如果当前线程没有被中断,则尝试着将node节点的状态修改为取消,并且放入到AQS队列中
          * 放入成功: 抛出异常
          * 放入失败: 尝试获取资源,获取成功执行中断
          */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //清除掉已经取消了的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //尾节点不为空并且不在条件队列中
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        //重新定位尾节点
        t = lastWaiter;
    }
    //创建一个节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //向链表尾部插入元素
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
/**
 * 尝试释放锁,释放成功,返回之前的状态
 * 如果释放失败将当前node线程的状态修改为已取消(1)并抛出异常
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
final boolean isOnSyncQueue(Node node) {
    //当前节点的状态的CONDITION并且其为头节点则一定不在AQS队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果当前节点的后继节点不为空则一定在AQS队列中
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //从后向前遍历当前节点是否在队列中
    return findNodeFromTail(node);
}

/**
 * 从后向前寻找该节点
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (; ; ) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

/**
 * 删除所有节点状态不为CONDITION的节点
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            //help GC
            t.nextWaiter = null;
            if (trail == null)
                //获取头节点
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        } else
            trail = t;
        //指针向后移动
        t = next;
    }
}

唤醒线程: 每次唤醒条件队列的队首元素

public final void signal() {
    //如果当前线程没有获取到锁,直接抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    //将条件队列的头元素移动到AQS队列
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        //判断头节点是否为最后一个节点
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //help GC
        first.nextWaiter = null;
        //将头节点的状态修改为0并且插入到AQS队列中
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
 * 将节点从条件队列放入到等待队列中
 */
final boolean transferForSignal(Node node) {
    //设置node节点状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //设置成功插入到AQS队列尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果node线程被取消(只有CANCELLED大于0)或者设置node状态为等待唤醒失败,则继续挂起
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}


喜欢 (0)