多线程编程 —— 线程池

 2023-01-30
原文作者:蒋先森 原文地址:https://jlj98.top/

在项目中,线程是一种稀缺资源,频繁的创建和销毁,对于系统的性能有着很大的消耗。线程池是线程资源复用的典范之作,通过维护一个一定数量的线程集合,在需要运行线程任务的时候直接从这个集合中取出一个线程去运行任务,而不是重新创建一个。这点在阿里Java手册上也提出了:

202212301148310091.png

使用线程池可以带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

在Java中提供了几种线程池创建的方法,不过这几种方法都是最后通过 ThreadPoolExecutor 来创建线程的。在开始讲Java提供的几种之前,先讲下 ThreadPoolExecutor 中几个参数的含义。

ThreadPoolExecutor

内部工作原理

  • corePoolSize :池中所保存的线程数,包括空闲线程,除非设置 allowCoreThreadTimeOut 参数,否则创建后会一直存活
  • maximumPoolSize:池中允许的最大线程数
  • keepAliveTime: 非核心线程 空闲线程等待新任务的最长时间,即线程数多余核心数的线程,会被销毁掉
  • unit:keepAliveTime 参数的时间单位
  • workQueue :执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务
  • threadFactory:执行程序创建新线程时使用的工厂
  • handler :由于超出线程范围和队列容量而使执行

线程池运行流程

  • 如果当前池大小 poolSize 小于 corePoolSize ,则创建新线程执行任务;
  • 如果当前池大小 poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列;
  • 如果当前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务;
  • 如果当前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务;
  • 线程池里的非核心线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么非核心线程就会退出。

在线程大于 corePoolSize,进入队列等待。如果队列也满了,且线程数小于 maximumPoolSize,则创建新的线程。如果线程数大于maximumPoolSize,则使用拒绝策略来处理任务。关于非核心线程的存活,详细看多线程编程 —— 线程池源码解析

拒绝策略

线程池有四种拒绝策略:

  • AbortPolicy:抛出 RejectedExecutionException 异常,默认
  • CallerRunsPolicy:使用调用者所在线程执行任务
  • DiscardPolicy:直接丢弃任务
  • DiscardOldestPolicy:丢弃队列中最旧的任务

对于线程池选择的拒绝策略可以通过 RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); 来设置。

构造函数

    public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? 
            null : 
            AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Java 中提供的几种方法

newFixedThreadPool

通过创建一个 corePoolSizemaximumPoolSize 相同的线程池。使用 LinkedBlockingQuene 作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

newCachedThreadPool

  • 初始化一个可以缓存线程的线程池,以前构建的线程可用时将重用它们,空闲线程默认缓存60s,线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列;
  • newFixedThreadPool 创建的线程池不同,newCachedThreadPool 在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

newSingleThreadExecutor

初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用 LinkedBlockingQueue 作为阻塞队列。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newScheduledThreadPool

初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据,也是使用 ThreadPoolExecutor 构建线程池,最大核心线程数 Integer.MAX_VALUE,使用DelayedWorkQueue 作为阻塞队列。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

newWorkStealingPool

newWorkStealingPool,它是 jdk1.8 提供的一种线程池,用于执行并行任务。默认并行级别为当前可用最大可用cpu数量的线程。

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

使用场景:用于大耗时同时可以分段并行的任务。

注意

除了 newWorkStealingPool 之外,其他四个创建方式都存在资源耗尽的风险。在阿里Java开发手册——嵩山版上面关于线程池创建的注意:

202212301148321092.png

在最新版的Java 开发手册中,在说 Executors 创建线程池弊端中,去掉了 newScheduledThreadPool,在之前的版本中,还是提示 newScheduledThreadPool 因为创建的线程数为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

多少的线程数合适?

CPU 密集型任务

对于CPU密集型任务,比如加密解密,压缩,计算等一系列需要大量耗费CPU资源的任务,最佳的任务线程数为CPU核心数的1~2倍,如果设置过多线程数,实际并不会起到很好的效果。以内CPU密集型任务会占用大量的CPU资源,所以这时的CPU的每个核心工作基本都是满负荷的,而设置多过线程,每个线程都会去利用CPU资源来执行任务,这就会造成不必要的上下文切换。

耗时IO型任务

耗时IO型任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗CPU资源,但是IO操作很耗时,总体会占用比较多的时候。对于这种任务,最大线程数一般会大于核心数的很多倍,因为IO读写速度相对于CPU的速度而言是比较慢的,如果设置过少的线程数,就会导致CPU资源的浪费。

《Java 并发编程实战》作者推介的算法:线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。

Reference

深入分析java线程池的实现原理
Github上的源码