Java阻塞队列 —— DelayQueue

 2023-01-31
原文作者:蒋先森 原文地址:https://jlj98.top/

DelayQueue是一个支持延时获取元素的无界阻塞队列。、队列元素会按照最终执行时间在队列中进行排序。当队列中的元素到达延迟时间时才会被取出。
主要作用:

  • 缓存:清掉缓存中超时的缓存数据
  • 任务超时处理

DelayQueue实现的关键主要有如下几个:

  • 可重入锁ReentrantLock
  • 用于阻塞和通知的Condition对象
  • 根据Delay时间排序的优先级队列:PriorityQueue
  • 用于优化阻塞通知的线程元素leader

Delayed

Delayed接口是用来标记那些应该在给定延迟时间之后执行的对象,它定义了一个long getDelay(TimeUnit unit)方法,该方法返回与此对象相关的的剩余时间。同时实现该接口的对象必须定义一个compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
先看下实例

    class DelayTask implements Delayed {
        //定义该元素及其属性
        public String name;
        public Long delayTime;
        public TimeUnit delayTimeUnit;
        public Long executeTime;//ms
    
        DelayTask(String name, long delayTime, TimeUnit delayTimeUnit) {
            this.name = name;
            this.delayTime = delayTime;
            this.delayTimeUnit = delayTimeUnit;
            this.executeTime = System.currentTimeMillis() + delayTimeUnit.toMillis(delayTime);
        }
        //getDelay方法的作用即是计算当前时间到执行时间之间还有多长时间
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        //compareTo方法的作用即是判断队列中元素的顺序谁前谁后。当前元素比队列元素后执行时,返回一个正数,比它先执行时返回一个负数,否则返回0
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            }
            return 0;
        }
    
    }

内部结构

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
            /** 可重入锁 */
        private final transient ReentrantLock lock = new ReentrantLock();
        /** 支持优先级的BlockingQueue */
        private final PriorityQueue<E> q = new PriorityQueue<E>();
         /** 用于优化阻塞 */
        private Thread leader = null;
        /** Condition */
        private final Condition available = lock.newCondition();
        }

offer()

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
             // 向 PriorityQueue中插入元素
            q.offer(e);
            // 如果当前元素的对首元素(优先级最高),leader设置为空,唤醒所有等待线程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

take()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 对首元素
                E first = q.peek();
                if (first == null)// 对首为空,阻塞,等待off()操作唤醒
                    available.await();
                else {
                     // 获取对首元素的超时时间
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)// <=0 表示已过期,出对,return
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // leader != null 证明有其他线程在操作,阻塞
                    if (leader != null)
                        available.await();
                    else {
                        // 否则将leader 设置为当前线程,独占
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 超时阻塞
                            available.awaitNanos(delay);
                        } finally {
                             // 释放leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 唤醒阻塞线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

总结

DelayQueue 的入队、出对过程和其他的阻塞队列没有很大区别,无非是在出对的时候增加了一个到期时间的判断。同时通过leader来减少不必要阻塞。