Java 并发编程 —— AQS

 2023-02-06
原文作者:蒋先森 原文地址:https://jlj98.top/

欢迎指正。

队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状态, 通用过内置的 FIFO 队列来,可以用于构建锁或者其他同步装置,完成资源获取线程的排队工作。

同步器的主要使用方法是继承、子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态镜像更改,这时就需要使用同步器提供的方法(getState()、setState(in newSate) 和 compareAndSetState(int expect, int update))来进行操作。

AQS 是一个抽象类,内置 自旋锁 实现的同步队列,封装入队和出队的操作,提供独占、共享、中断等特性。

基于AQS构建的同步器:

  • ReentrantLock
  • Semaphore
  • CountDownLatch
  • ReentrantReadWriteLock
  • SynchronusQueue
  • FutureTask

AQS 核心知识

队列同步器的接口与实例

重写同步器指定的方法是,需要使用同步器提供的3个方法来访问或者修稿同步状态:

  • getState():获取当前同步状态
  • setState(in newSate):设置当前同步状态
  • ompareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。

CLH 同步队列

CLH 同步队列(Craig、Landin、Hagersten)是一个 FIFO双向对象 ,AQS 依赖它来完成同步状态的管理,当线程如果获取同步状态失败时,AQS 则会将当前线程等待状态等信息构造成一个节点(Node)并将其加入到 CLH 同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试同步状态。

    static final class Node {
        /** 共享*/
        static final Node SHARED = new Node();
        /** 独占*/
        static final Node EXCLUSIVE = null;
        /**因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态 */
        static final int CANCELLED =  1;
        /**  后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
        static final int SIGNAL    = -1;
        /** 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中*/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         * 表示下一次共享式同步状态获取将会无条件地传播下去
         */
        static final int PROPAGATE = -3;
        /** 等待状态 */
        volatile int waitStatus;
        /** 前驱节点*/
        volatile Node prev;
        /**后续节点*/
        volatile Node next;
        /** 获取同步状态的线程*/
        volatile Thread thread;
    
        Node nextWaiter;
    
        /**Returns true if node is waiting in shared mode.*/
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {    // Used to establish initial head or SHARED marker
        }
    
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

AQS 实现细节

线程首先尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到FIFO队列中。 接着会不断的循环尝试获取锁,条件是当前节点为head的直接后继才会尝试。如果失败就会阻塞自己直到自己被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

Reference