《Java源码分析》:Semaphore
Semaphore 是一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
说白了,Semaphore是一个计数器,在计数器不为0的时候对线程就放行,一旦达到0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,也就是说Semaphore不是可重入的。每一次请求一个许可都会导致计数器减少1,同样每次释放一个许可都会导致计数器增加1,一旦达到了0,新的许可请求线程将被挂起。
缓存池整好使用此思想来实现的,比如链接池、对象池等。下面看一个具体的实现。
public class ObjectCache<T> {
interface ObjectFactory<T>{
T makeObject();
}
private Semaphore semaphore;
private int capacity;
private ObjectFactory<T> factory;
private Lock lock;
private Node head,tail;
private class Node{
T obj;
Node next;
}
public ObjectCache(int capacity,
ObjectFactory<T> factory) {
this.capacity = capacity;
this.factory = factory;
this.lock = new ReentrantLock();
this.semaphore =new Semaphore(capacity);
head = tail = null;
}
public T getObject(){
try {
semaphore.acquire();//如果还有资源,则允许通过
} catch (InterruptedException e) {
e.printStackTrace();
}
return getNextObject();
}
private T getNextObject() {
lock.lock();
try{
if(head==null){//目前还没有任何产品,应该生产
return factory.makeObject();
}
T value = head.obj;
Node next = head.next;
if(next==null){
tail = null;
}
else{
head.next = null;//help GC
head = next;
}
return value;
}finally{
lock.unlock();
}
}
public void returnObject(T t){
returnObjectToPool(t);
semaphore.release();//表示释放资源
}
public void returnObjectToPool(T t){
lock.lock();
try{
Node node = new Node();
node.obj = t;
if(head==null){
head = tail = node;
}
else{
tail.next = node;
tail = node;
}
}finally{
lock.unlock();
}
}
}
Semaphore的构造函数
当我们使用Semaphore semaphore = new Semaphore(10)
时其内部的实例化如下:
/*
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*翻译:创建permits个许可且是非公平的信号量对象
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
NonfairSync(int permits) {
super(permits);//直接调用父类Sync的构造方法
}
Sync(int permits) {
setState(permits);//调用同步器AQS类中的setState方法设置状态位
}
/*
* Sets the value of synchronization state.
*/
protected final void setState(int newState) {
state = newState;
}
Semaphore直接new了一个NonfairSync类的对象。这里要说明下,Semaphore类是委托给实现了AQS类的Sync类的两个子类FairSync、NonFairSync来实现的。
Semaphore信号量和ReentrantLock锁一样,也存在公平和不公平。
如果Semaphore委托给FairSync类实现,就是公平信号量。
如果Semaphore委托给NonFairSync类实现,就是非公平信号量。
那么什么是公平Semaphore呢??按照FIFO队列中的顺序分配Semaphore所管理的许可就是公平的。
非公平Semaphore信号量对于任何申请许可的线程来说,都是第一时间看是否有多余的许可,如果有则给此线程,如果没有则进队列排队等待,而不是此线程直接进AQS队列排队等待按顺序来拿到许可,利用此间隙来分配许可可以提高并发量。但是会引发一个问题:越活跃的线程越能够拿到许可,造成“饥渴死”现象。
下面我们来看Semaphore的两个比较常用方法的内部实现。
void acquire()
根据API文档的介绍,我们知道
void acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
下面我们来从源码角度看下这个方法的内部实现。
/*翻译:
*从Semaphore对象中中获取一个许可,如果没有则会阻塞直至有一个许可可用或者是被中断。
*获取一个许可后并立即返回将可利用的许可数量减一
*如果没有许可可以利用则当前线程由于调度的原因会被禁用,休眠至发生如下两件事之一:
*1、其它的线程调用了这个信号量的release方法且当前线程获取许可
*2、其它线程中断了当前线程
*如果当前线程在进入这个方法时设置了中断标志位,
或者是当等待许可时发生了中断,则会抛异常且清除中断标志位
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
这个方法直接调用的是AQS的acquireSharedInterruptibly(int arg)方法。
这个方法具体详细的功能见翻译的注释,简单来说:获取一个许可,如果没有则阻塞。
既然此方法调用了AQS的acquireSharedInterruptibly(int arg)方法,那我们就业来看下。
/*
函数功能:以共享模式获取对象。中断后退出。
具体实现是第一步首先检查是否有中断标志位,然后至少一次的调用tryAcquireShared方法直至返回成功。
否则线程进行AQS队列,可能重复的阻塞和非阻塞直至调用tryAcquireShared方法成功或者是线程被中断
翻译的好烂,my god !
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
/*
调用tryAcquireShared尝试的获取锁,
返回值为剩余共享锁的个数,如果返回大于等于0则表示获取成功则立即返回
如果返回值小于0则调用doAcquireSharedInterruptibly完成将当前线程加入到同步队列,自旋检测获取锁
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
代码中的中文是对源码中的注释翻译得到的,可能翻译的比较烂,凑合的看哈。
此函数做了如下几件事:
1、检测此线程是否被中断了,如果中断,则抛中断异常。如果没有被中断,则进行2
2、调用tryAcquireShares(int arg)方法以共享模式来获取对象(锁),如果此方法值大于等于0则表明获取到锁立即返回,否则进行 3
3、由于没有获取到锁,则调用doAcquireSharedInterruptibly方法进入AQS同步队列进行自旋等待。
下面就先看下tryAcquireShares(int arg)方法是怎么样的。
由于存在非公平和公平Semaphore,因此这里有一点点不同,也只有这里不同。
非公平锁的tryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))//修改AQS状态位
return remaining;
}
}
公平锁的tryAcquireShared方法
/*
公平锁与非公平锁的区别在于:始终按照AQS队列的先后顺序来。
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
比较非公平的公平的源码可以看到,公平Semaphore对象对许可的分配是按照FIFO队列来分配的,以保证公平性。因此其首先调用了hasQueuedPredecessors方法来判断当前线程是否是AQS队列中的头结点,如果不是,则不给于分配需要加入到同步队列中等待。而非公平的Semaphore对象就不是这样的,有许可我们就分配出去,不需要排队等待。
当一个线程第一次获取共享锁失败之后,就会调用doAcquireSharedInterruptibly(int
arg)方法来自旋等待获取锁。
该方法的代码如下:
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程作为内容构造成节点以共享模式加入到AQS同步队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {//不停的自旋来获取锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//返回剩余的锁的个数
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法的具体思路如下:
1、首先调用addWaiter方法将当前线程构成的节点加入到同步队列AQS中
2、将一直自旋检测该线程节点的前驱节点是否为头结点,如果是则调用tryAcquireShared尝试的获取锁。如果不是,则进行 3
3、判断此节点是否需要阻塞以及异常检测。
acquire方法总结如下:
1、尝试在非公平模式下获取一个许可,或者叫做锁。如果获取到则立即返回并将许可计数器减一,如果没有获取到,则进行2
2、进入到AQS队列自旋等待,当此节点的前驱是头结点后,又开始尝试获取锁。直至成功获取或中断取消。
以上就是关于acquire()的内部实现过程。
release()方法:释放一个许可
当调用semaphore.release()方法的内部实现是怎样的呢??
下面我们来看下release方法的内部实现。
/*
翻译:
释放一个许可到Semaphore对象中,释放一个许可,将可用许可的数量加1.
当一个线程尝试这取释放一个许可,那么就选择另外一个线程(可重入)来获取这个许可
没有要求一个线程释放一个许可就必须要通过调用acquire方法来获取一个许可
*/
public void release() {
sync.releaseShared(1);
}
此方法直接调用了AQS的releaseShared(int arg)方法。
/*
翻译:以共享模式释放锁,
通过非阻塞的一个或多个线程调用tryReleaseShared方法返回true来实现
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//设置AQS的状态位,即添加可用许可数
doReleaseShared();
return true;
}
return false;
}
此函数总共干了两件事:
1、调用tryReleaseShared方法来完成AQS状态位的设置。如果设置成功,则进行2
2、调用doReleaseShared()来唤醒在AQS中等待的需要许可的后继节点来获取许可。
tryReleaseShared(int releases)方法的代码比较简单,就是CAS来设置了状态为state。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared()方法的功能也比较清晰,唤醒AQS队列中需要许可的继任节点。具体代码不再分析。
/*
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
翻译:以共享模式释放,发信号给后继的一些节点。
注意:对于独占模式,只会调用unparkSuccessor来唤醒AQS队列总的头结点(如果其需要信号)
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
以上就是关于release方法的内部实现,是不是思路还是比较简单的。应该这么说,我们看的ReentrantLock、Condition、CountDownLatch的思想实现都是如此,当我们看这些源码看的比较多的时候,发现这些类库的实现思想都是借助于AQS类来实现的。
Semaphore的应用:实现生产消费者模型
最后以一个生产消费者模型来结束Semaphore类的分析。
例子是用3个Semaphore对象来实现的。
public class SemaphoreDemo {
private Semaphore produceSem;
private Semaphore customerSem;
private Semaphore mutex;
private Object[] warehouse;
private int head,tail;
public SemaphoreDemo(int capacity){
produceSem = new Semaphore(capacity);
customerSem = new Semaphore(0);
warehouse = new Object[capacity];
head = 0;
tail = 0;
mutex = new Semaphore(1);
}
public void put(Object o){
try {
produceSem.acquire();//获取存储资格
} catch (InterruptedException e) {
e.printStackTrace();
}
putObject(o);
customerSem.release();//有消费的资源了
}
private void putObject(Object obj){
try {
//锁定
mutex.acquire();
warehouse[tail++] = obj;
if(tail==warehouse.length){
tail = 0;
}
System.out.println(Thread.currentThread().getName()+"生产产品: "+(Integer)obj);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//释放锁
mutex.release();
}
}
public Object get(){
try {
customerSem.acquire();//保证有资源可以消费
} catch (InterruptedException e) {
e.printStackTrace();
}
Object obj = getObject();
//System.out.println(Thread.currentThread().getName()+"拿到产品: "+obj);
produceSem.release();// 增加可以生产的信号量
return obj;
}
private Object getObject() {
try {
mutex.acquire();//类似于获取锁
Object obj = warehouse[head];
head++;
if(head==warehouse.length){
head = 0;
}
System.out.println(Thread.currentThread().getName()+"拿到产品: "+obj);
return obj;
} catch (InterruptedException e) {
e.printStackTrace();
}
finally{
mutex.release();
}
return null;
}
private static AtomicInteger at = new AtomicInteger(0);
public static void main(String[] args){
SemaphoreDemo sd = new SemaphoreDemo(10);
//开启3个生产者、消费者线程
for(int i=0;i<3;i++){
new Thread(new Runnable(){
@Override
public void run() {
while(true){
int val = at.incrementAndGet();
sd.put(val);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"produceThread"+i).start();
new Thread(new Runnable(){
@Override
public void run() {
while(true){
sd.get();
//System.out.println(Thread.currentThread().getName()+"拿到的产品为:"+str);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"customerThread"+i).start();
}
}
}
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。