Java 并发编程之 ReentrantLock 详解

 2023-02-01
原文作者:蒋先森 原文地址:https://jlj98.top/

声明:如果本文有错误,希望指出。

ReentrantLock 位于 java.util.concurrent.locks 包下,它实现了 Lock 接口和 Serializable 接口。

ReentrantLock 默认非公平,但可实现公平的(构造器传true),悲观,独享,互斥,可重入,重量级锁。ReentrantLock 就是一个普通的类,它是基于 AQS(AbstractQueuedSynchronizer)来实现的。

ReetrantLock 基本用法

构造方法

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    //判断是否开启公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

ReentrantLock 提供公平锁和非公平锁的构造方法,默认构造方法是非公平锁。

NonfairSync 和 FairSync 都是 ReentrantLock 的内部类,继承 Sync 类。

202212301139531361.png

关于公平锁和非公平锁的区别,主要是在多线程情况下,获取锁的机会是否相同。

几种获取锁的方法

获取 ReentrantLock 的几种方式:

  • lock(): 如果获取了锁立即返回,如果别的线程持有锁,当前线程则一直处于休眠状态,直到获取锁
  • tryLock():如果获取了锁立即返回true,如果别的线程正持有锁,立即返回false
  • tryLock(long timeout,TimeUnit unit):如果获取了锁定立即返回true,如果别的线程正持有锁,会等待参数给定的时间,在等待的过程中,如果获取了锁定,就返回true,如果等待超时,返回false;
  • lockInterruptibly():如果获取了锁定立即返回,如果没有获取锁,当前线程处于休眠状态,直到获取锁定,或者当前线程被别的线程中断

公平锁加锁的流程(lock)

在我们使用lock的时候,由于 FairSync 继承 Sync,并重新实现了lock()方法,在源码中:

202212301139564902.png

从上面的加锁流程,可以看出,不管是公平锁,还是非公平锁,最后都调用了 acquire(int arg) 方法。acquire() 方法是 AQS 中的方法,下面来看下acquire的主要流程

tryAcquire(int arg)

AQS 中的 tryAcquire 方法,具体实现交给了 FairSync 实现,这一步主要是尝试获取锁。

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //获取当前锁的状态
        int c = getState();
        //如果无锁
        if (c == 0) {
            //判断AQS的队列中是否还有其他线程等待,并且通过CAS尝试修改state的值
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                //将当前线程设置为独占线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            //重入锁,获取锁的次数+1
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    //去队列中判断是否有比当前线程等待时间更长的线程
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // 队列尾部
        Node h = head; //队列头部
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

其中的 getState() 是获取AQS 中的state值,这个值是volatile关键字修饰的,这个字段是一个同步锁的状态,框架通过 CAS 来原子操作这个值的变化。

利用hasQueuedPredecessors()方法来判断队列中是否有其他线程,如果有,则不会尝试获取锁。如果没有,利用CAS将AQS中的state修改为1,也就是获取锁,并将当前线程设置为获取锁的独占线程。

如果state>0了,表示锁已经被获取了,这时就需要判断获取锁的线程是否为当前线程,是的话,state+1。

tryAcquire()会查看同步状态是否获取成功,如果成功,返回true,结束返回,如果!tryAcquire()==false,则调用addWaiter()方法。

addWaiter(Node mode)

如果前面的tryAcquire(int acquires)方法获取锁失败,则需要 addWaiter(Node.EXCLUSIVE)将当前线程写入AQS队列中。

将当前线程和Node节点进行封装,AQS中节点类型有两种:SHAREDEXCLUSIVE,前者是共享模式,后者是独占模式。

    private Node addWaiter(Node mode) {
        //将当前线程封装成EXCLUSIVE类型的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //判断是否有尾节点,如果有尾节点,将封装好Node利用CAS写入对尾,否则执行enq()
        if (pred != null) {
            node.prev = pred;
            //CAS操作
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //将当前节点插入到队列中
        enq(node);
        return node;
    }
    // enq 入队操作,利用自旋+CAS保证一定能写入队列
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued()

写入队列后,需要将当前线程挂起,利用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前节点的前置节点
                final Node p = node.predecessor();
                //判断前置节点是否为头结点,并尝试获取独占式锁
                if (p == head && tryAcquire(arg)) {
                    //将队列头指针用指向当前节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //Node.SIGNAL 表示当前线程阻塞
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    //
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//阻塞当前线程
        return Thread.interrupted();
    }

首先会进行无限循环中,循环中每次都会判断给定当前节点的前置节点,如果没有前置节点会直接抛出空指针异常,直到返回 true。

首先判断当前节点的前置节点是否是头结点,并尝试获取独占锁,如果成功,则将头结点指向当前节点,然后释放前置节点。如果没成功,则进入下一个判断条件。

根据上一个节点的 waitStatus 状态来处理 shouldParkAfterFailedAcquire()waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire() 循环尝试修改 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)。parkAndCheckInterrupt 该方法的关键是会调用 LookSupport.park 方法,该方法是用来阻塞当前线程。

selfInterrupt()

中断当前线程

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

非公平锁加锁模式

非公平锁的的加锁步骤和公平锁大致相同,只有两处不同(不同点在代码中标注),一处是在尝试获取锁前,直接通过CAS设置同步状态,如果成功,就将当前线程设置为偏向锁的线程;另外一处是在tryAcquire获取失败后,不需要去执行hasQueuedPredecessors方法,判断等待队列中是否还有等待线程。

    final void lock() {
        //①
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //②
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

释放锁

公平锁和非公平锁的释放流程是一样的。

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	   //唤醒被挂起的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    //尝试释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。

释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。