声明:如果本文有错误,希望指出。
ReentrantLock 位于 java.util.concurrent.locks
包下,它实现了 Lock 接口和 Serializable 接口。
ReentrantLock 默认非公平,但可实现公平的(构造器传true),悲观,独享,互斥,可重入,重量级锁。ReentrantLock 就是一个普通的类,它是基于 AQS(AbstractQueuedSynchronizer)来实现的。
ReetrantLock 基本用法
构造方法
public ReentrantLock() {
sync = new NonfairSync();
}
//判断是否开启公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 提供公平锁和非公平锁的构造方法,默认构造方法是非公平锁。
NonfairSync 和 FairSync 都是 ReentrantLock 的内部类,继承 Sync 类。
关于公平锁和非公平锁的区别,主要是在多线程情况下,获取锁的机会是否相同。
几种获取锁的方法
获取 ReentrantLock 的几种方式:
- lock(): 如果获取了锁立即返回,如果别的线程持有锁,当前线程则一直处于休眠状态,直到获取锁
- tryLock():如果获取了锁立即返回true,如果别的线程正持有锁,立即返回false
- tryLock(long timeout,TimeUnit unit):如果获取了锁定立即返回true,如果别的线程正持有锁,会等待参数给定的时间,在等待的过程中,如果获取了锁定,就返回true,如果等待超时,返回false;
- lockInterruptibly():如果获取了锁定立即返回,如果没有获取锁,当前线程处于休眠状态,直到获取锁定,或者当前线程被别的线程中断
公平锁加锁的流程(lock)
在我们使用lock的时候,由于 FairSync 继承 Sync,并重新实现了lock()方法,在源码中:
从上面的加锁流程,可以看出,不管是公平锁,还是非公平锁,最后都调用了 acquire(int arg) 方法。acquire() 方法是 AQS 中的方法,下面来看下acquire的主要流程
tryAcquire(int arg)
AQS 中的 tryAcquire 方法,具体实现交给了 FairSync 实现,这一步主要是尝试获取锁。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取当前锁的状态
int c = getState();
//如果无锁
if (c == 0) {
//判断AQS的队列中是否还有其他线程等待,并且通过CAS尝试修改state的值
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//将当前线程设置为独占线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//重入锁,获取锁的次数+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//去队列中判断是否有比当前线程等待时间更长的线程
public final boolean hasQueuedPredecessors() {
Node t = tail; // 队列尾部
Node h = head; //队列头部
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
其中的 getState() 是获取AQS 中的state值,这个值是volatile关键字修饰的,这个字段是一个同步锁的状态,框架通过 CAS 来原子操作这个值的变化。
利用hasQueuedPredecessors()方法来判断队列中是否有其他线程,如果有,则不会尝试获取锁。如果没有,利用CAS将AQS中的state修改为1,也就是获取锁,并将当前线程设置为获取锁的独占线程。
如果state>0了,表示锁已经被获取了,这时就需要判断获取锁的线程是否为当前线程,是的话,state+1。
tryAcquire()会查看同步状态是否获取成功,如果成功,返回true,结束返回,如果!tryAcquire()==false,则调用addWaiter()方法。
addWaiter(Node mode)
如果前面的tryAcquire(int acquires)
方法获取锁失败,则需要 addWaiter(Node.EXCLUSIVE)
将当前线程写入AQS队列中。
将当前线程和Node节点进行封装,AQS中节点类型有两种:SHARED
和 EXCLUSIVE
,前者是共享模式,后者是独占模式。
private Node addWaiter(Node mode) {
//将当前线程封装成EXCLUSIVE类型的Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//判断是否有尾节点,如果有尾节点,将封装好Node利用CAS写入对尾,否则执行enq()
if (pred != null) {
node.prev = pred;
//CAS操作
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//将当前节点插入到队列中
enq(node);
return node;
}
// enq 入队操作,利用自旋+CAS保证一定能写入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()
写入队列后,需要将当前线程挂起,利用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//判断前置节点是否为头结点,并尝试获取独占式锁
if (p == head && tryAcquire(arg)) {
//将队列头指针用指向当前节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//Node.SIGNAL 表示当前线程阻塞
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞当前线程
return Thread.interrupted();
}
首先会进行无限循环中,循环中每次都会判断给定当前节点的前置节点,如果没有前置节点会直接抛出空指针异常,直到返回 true。
首先判断当前节点的前置节点是否是头结点,并尝试获取独占锁,如果成功,则将头结点指向当前节点,然后释放前置节点。如果没成功,则进入下一个判断条件。
根据上一个节点的 waitStatus
状态来处理 shouldParkAfterFailedAcquire()
。waitStatus
用于记录当前节点的状态,如节点取消、节点等待等。
shouldParkAfterFailedAcquire() 循环尝试修改 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)。parkAndCheckInterrupt 该方法的关键是会调用 LookSupport.park 方法,该方法是用来阻塞当前线程。
selfInterrupt()
中断当前线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
非公平锁加锁模式
非公平锁的的加锁步骤和公平锁大致相同,只有两处不同(不同点在代码中标注),一处是在尝试获取锁前,直接通过CAS设置同步状态,如果成功,就将当前线程设置为偏向锁的线程;另外一处是在tryAcquire获取失败后,不需要去执行hasQueuedPredecessors方法,判断等待队列中是否还有等待线程。
final void lock() {
//①
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//②
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
释放锁
公平锁和非公平锁的释放流程是一样的。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒被挂起的线程
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state
减到 0 才认为完全释放锁。
释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。