2023-01-14  阅读(1)
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java源码分析》:Semaphore

Semaphore 是一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

说白了,Semaphore是一个计数器,在计数器不为0的时候对线程就放行,一旦达到0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,也就是说Semaphore不是可重入的。每一次请求一个许可都会导致计数器减少1,同样每次释放一个许可都会导致计数器增加1,一旦达到了0,新的许可请求线程将被挂起。

缓存池整好使用此思想来实现的,比如链接池、对象池等。下面看一个具体的实现。

        public class ObjectCache<T> {
            interface ObjectFactory<T>{
                T makeObject();
            }
            private Semaphore semaphore;
            private int capacity;
            private ObjectFactory<T> factory;
            private Lock lock;
    
            private Node head,tail;
            private class Node{
                T obj;
                Node next;
            }
            public ObjectCache(int capacity,
                    ObjectFactory<T> factory) {
                this.capacity = capacity;
                this.factory = factory;
                this.lock = new ReentrantLock();
                this.semaphore =new Semaphore(capacity);
                head = tail = null;
            }   
    
            public T getObject(){
                try {
                    semaphore.acquire();//如果还有资源,则允许通过
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return getNextObject();
    
            }
    
            private T getNextObject() {
                lock.lock();
    
                try{
                    if(head==null){//目前还没有任何产品,应该生产
                        return factory.makeObject();
                    }
                    T value = head.obj;
                    Node next = head.next;
                    if(next==null){
                        tail = null;
                    }
                    else{
                        head.next = null;//help GC
                        head = next;
                    }
                    return value;
                }finally{
                    lock.unlock();
                }
            }
    
            public void returnObject(T t){
                returnObjectToPool(t);
                semaphore.release();//表示释放资源
            }
    
            public void returnObjectToPool(T t){
                lock.lock();
    
                try{
                    Node node = new Node();
                    node.obj = t;
                    if(head==null){
                        head = tail = node;
                    }
                    else{
                        tail.next = node;
                        tail = node;
                    }
    
                }finally{
                    lock.unlock();
                }
            }
        }

Semaphore的构造函数

当我们使用Semaphore semaphore = new Semaphore(10)时其内部的实例化如下:

            /*
             * Creates a {@code Semaphore} with the given number of
             * permits and nonfair fairness setting.
             *翻译:创建permits个许可且是非公平的信号量对象
             */
            public Semaphore(int permits) {
                sync = new NonfairSync(permits);
            }
            NonfairSync(int permits) {
                super(permits);//直接调用父类Sync的构造方法
            }
    
            Sync(int permits) {
                setState(permits);//调用同步器AQS类中的setState方法设置状态位
            }
    
            /*
             * Sets the value of synchronization state.
             */
            protected final void setState(int newState) {
                state = newState;
            }

Semaphore直接new了一个NonfairSync类的对象。这里要说明下,Semaphore类是委托给实现了AQS类的Sync类的两个子类FairSync、NonFairSync来实现的。

Semaphore信号量和ReentrantLock锁一样,也存在公平和不公平。

如果Semaphore委托给FairSync类实现,就是公平信号量。

如果Semaphore委托给NonFairSync类实现,就是非公平信号量。

那么什么是公平Semaphore呢??按照FIFO队列中的顺序分配Semaphore所管理的许可就是公平的。

非公平Semaphore信号量对于任何申请许可的线程来说,都是第一时间看是否有多余的许可,如果有则给此线程,如果没有则进队列排队等待,而不是此线程直接进AQS队列排队等待按顺序来拿到许可,利用此间隙来分配许可可以提高并发量。但是会引发一个问题:越活跃的线程越能够拿到许可,造成“饥渴死”现象。

下面我们来看Semaphore的两个比较常用方法的内部实现。

void acquire()

根据API文档的介绍,我们知道

void acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

下面我们来从源码角度看下这个方法的内部实现。

        /*翻译:
         *从Semaphore对象中中获取一个许可,如果没有则会阻塞直至有一个许可可用或者是被中断。
         *获取一个许可后并立即返回将可利用的许可数量减一
         *如果没有许可可以利用则当前线程由于调度的原因会被禁用,休眠至发生如下两件事之一:
         *1、其它的线程调用了这个信号量的release方法且当前线程获取许可
         *2、其它线程中断了当前线程
         *如果当前线程在进入这个方法时设置了中断标志位,
         或者是当等待许可时发生了中断,则会抛异常且清除中断标志位
         */
    
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

这个方法直接调用的是AQS的acquireSharedInterruptibly(int arg)方法。

这个方法具体详细的功能见翻译的注释,简单来说:获取一个许可,如果没有则阻塞。

既然此方法调用了AQS的acquireSharedInterruptibly(int arg)方法,那我们就业来看下。

        /*
    
         函数功能:以共享模式获取对象。中断后退出。
         具体实现是第一步首先检查是否有中断标志位,然后至少一次的调用tryAcquireShared方法直至返回成功。
         否则线程进行AQS队列,可能重复的阻塞和非阻塞直至调用tryAcquireShared方法成功或者是线程被中断
    
         翻译的好烂,my god !
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            /*
                调用tryAcquireShared尝试的获取锁,
                返回值为剩余共享锁的个数,如果返回大于等于0则表示获取成功则立即返回
                如果返回值小于0则调用doAcquireSharedInterruptibly完成将当前线程加入到同步队列,自旋检测获取锁
            */
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }

代码中的中文是对源码中的注释翻译得到的,可能翻译的比较烂,凑合的看哈。

此函数做了如下几件事:

1、检测此线程是否被中断了,如果中断,则抛中断异常。如果没有被中断,则进行2

2、调用tryAcquireShares(int arg)方法以共享模式来获取对象(锁),如果此方法值大于等于0则表明获取到锁立即返回,否则进行 3

3、由于没有获取到锁,则调用doAcquireSharedInterruptibly方法进入AQS同步队列进行自旋等待。

下面就先看下tryAcquireShares(int arg)方法是怎么样的。

由于存在非公平和公平Semaphore,因此这里有一点点不同,也只有这里不同。

非公平锁的tryAcquireShared方法

            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))//修改AQS状态位
                        return remaining;
                }
            }

公平锁的tryAcquireShared方法

        /*
            公平锁与非公平锁的区别在于:始终按照AQS队列的先后顺序来。
        */
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

比较非公平的公平的源码可以看到,公平Semaphore对象对许可的分配是按照FIFO队列来分配的,以保证公平性。因此其首先调用了hasQueuedPredecessors方法来判断当前线程是否是AQS队列中的头结点,如果不是,则不给于分配需要加入到同步队列中等待。而非公平的Semaphore对象就不是这样的,有许可我们就分配出去,不需要排队等待。

当一个线程第一次获取共享锁失败之后,就会调用doAcquireSharedInterruptibly(int
arg)方法来自旋等待获取锁。

该方法的代码如下:

        /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //将当前线程作为内容构造成节点以共享模式加入到AQS同步队列中
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {//不停的自旋来获取锁
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);//返回剩余的锁的个数
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

该方法的具体思路如下:

1、首先调用addWaiter方法将当前线程构成的节点加入到同步队列AQS中

2、将一直自旋检测该线程节点的前驱节点是否为头结点,如果是则调用tryAcquireShared尝试的获取锁。如果不是,则进行 3

3、判断此节点是否需要阻塞以及异常检测。

acquire方法总结如下:

1、尝试在非公平模式下获取一个许可,或者叫做锁。如果获取到则立即返回并将许可计数器减一,如果没有获取到,则进行2

2、进入到AQS队列自旋等待,当此节点的前驱是头结点后,又开始尝试获取锁。直至成功获取或中断取消。

以上就是关于acquire()的内部实现过程。

release()方法:释放一个许可

当调用semaphore.release()方法的内部实现是怎样的呢??

下面我们来看下release方法的内部实现。

        /*
        翻译:
         释放一个许可到Semaphore对象中,释放一个许可,将可用许可的数量加1.
         当一个线程尝试这取释放一个许可,那么就选择另外一个线程(可重入)来获取这个许可
         没有要求一个线程释放一个许可就必须要通过调用acquire方法来获取一个许可
        */
        public void release() {
            sync.releaseShared(1);
        }

此方法直接调用了AQS的releaseShared(int arg)方法。

        /*
        翻译:以共享模式释放锁,
        通过非阻塞的一个或多个线程调用tryReleaseShared方法返回true来实现
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {//设置AQS的状态位,即添加可用许可数
                doReleaseShared();
                return true;
            }
            return false;
        }

此函数总共干了两件事:

1、调用tryReleaseShared方法来完成AQS状态位的设置。如果设置成功,则进行2

2、调用doReleaseShared()来唤醒在AQS中等待的需要许可的后继节点来获取许可。

tryReleaseShared(int releases)方法的代码比较简单,就是CAS来设置了状态为state。

            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }

doReleaseShared()方法的功能也比较清晰,唤醒AQS队列中需要许可的继任节点。具体代码不再分析。

        /*
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         翻译:以共享模式释放,发信号给后继的一些节点。
         注意:对于独占模式,只会调用unparkSuccessor来唤醒AQS队列总的头结点(如果其需要信号)
         */
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

以上就是关于release方法的内部实现,是不是思路还是比较简单的。应该这么说,我们看的ReentrantLock、Condition、CountDownLatch的思想实现都是如此,当我们看这些源码看的比较多的时候,发现这些类库的实现思想都是借助于AQS类来实现的。

Semaphore的应用:实现生产消费者模型

最后以一个生产消费者模型来结束Semaphore类的分析。

例子是用3个Semaphore对象来实现的。

        public class SemaphoreDemo {
            private Semaphore produceSem;
            private Semaphore customerSem;
    
            private Semaphore mutex;
            private Object[] warehouse;
            private int head,tail;
            public SemaphoreDemo(int capacity){
                produceSem = new Semaphore(capacity);
                customerSem = new Semaphore(0);
                warehouse = new Object[capacity];
                head = 0;
                tail = 0;
                mutex = new Semaphore(1);
            }
    
            public void put(Object o){
                try {
                    produceSem.acquire();//获取存储资格
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                putObject(o);
    
                customerSem.release();//有消费的资源了
            }
    
            private void putObject(Object obj){
                try {
                    //锁定
                    mutex.acquire();
                    warehouse[tail++] = obj;
                    if(tail==warehouse.length){
                        tail = 0;
                    }   
                    System.out.println(Thread.currentThread().getName()+"生产产品:   "+(Integer)obj);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    //释放锁
                    mutex.release();
                }
    
            }
    
            public Object get(){
                try {
                    customerSem.acquire();//保证有资源可以消费
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Object obj = getObject();
                //System.out.println(Thread.currentThread().getName()+"拿到产品:  "+obj);
                produceSem.release();// 增加可以生产的信号量
                return obj;
            }
    
            private Object getObject() {
                try {
                    mutex.acquire();//类似于获取锁
                    Object obj = warehouse[head];
    
                    head++;
                    if(head==warehouse.length){
                        head = 0;
                    }
                    System.out.println(Thread.currentThread().getName()+"拿到产品:  "+obj);
                    return obj;
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally{
                    mutex.release();
                }
    
                return null;
            }
            private static AtomicInteger at = new AtomicInteger(0);
            public static void main(String[] args){
                SemaphoreDemo sd = new SemaphoreDemo(10);
                //开启3个生产者、消费者线程
                for(int i=0;i<3;i++){
                    new Thread(new Runnable(){
    
                        @Override
                        public void run() {
                            while(true){
                                int val = at.incrementAndGet();
                                sd.put(val);
    
                                try {
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
    
                        }
    
                    },"produceThread"+i).start();
                    new Thread(new Runnable(){
                        @Override
                        public void run() {
                            while(true){
                                sd.get();
                                //System.out.println(Thread.currentThread().getName()+"拿到的产品为:"+str);
                                try {
                                    Thread.sleep(5000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
    
                        }
    
                    },"customerThread"+i).start();
                }
            }
    
        }

Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文