2024-08-04
版权声明:本文为博主付费文章,严禁任何形式的转载和摘抄,维权必究。 本文链接:https://www.skjava.com/mianshi/baodian/detail/1962873412

回答

ThreadPoolExecutor 对核心参数提供了一些 setter 方法,根据这些 setter 方法我们可以在程序运行时动态调整线程池的核心参数:

public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

在实际生产场景下,我们需要定期检查线程池中的任务队列长度、活动线程数和核心线程数等指标。然后根据监控结果,调用对应的 setter 方法设置参数即可。

扩展

演示示例

下面大明哥用一个简单的示例来演示如何来动态调整线程池:

@Slf4j
public class DynamicThreadPool {
  private final ThreadPoolExecutor executor;
  private final ScheduledExecutorService scheduler;

  public DynamicThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,int capacity) {
    this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(capacity));
    // 为了更好地演示,拒绝策略选择 DiscardPolicy ,任务直接丢弃,不管
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    this.scheduler = Executors.newScheduledThreadPool(1);

    // 定时任务,每 10 秒检测一次
    scheduler.scheduleAtFixedRate(this::adjustThreadPoolSize, 3, 3, TimeUnit.SECONDS);
  }

  /**
   * 动态调整线程池
   */
  private void adjustThreadPoolSize() {
    // 队列大小
    int queueSize = executor.getQueue().size();
    // 活跃线程数
    int activeCount = executor.getActiveCount();
    // 核心线程数
    int corePoolSize = executor.getCorePoolSize();
    // 最大线程数
    int maxPoolSize = executor.getMaximumPoolSize();

    log.warn("queueSize:{};;activeCount:{};;corePoolSize:{};;maximumPoolSize:{}", queueSize, activeCount, corePoolSize, maxPoolSize);

    if (queueSize > maxPoolSize && activeCount >= maxPoolSize * 0.5) {
      // 如果队列长度大于最大线程数,且活跃线程数是最大线程的 0.5倍,增加线程
      int newPoolSize = Math.max(maxPoolSize, activeCount * 2);
      executor.setMaximumPoolSize(newPoolSize);
      log.error("活跃线程数是最大线程的 1.5 倍,调整 maxPoolSize = {}", newPoolSize);
    } else if (queueSize < corePoolSize && activeCount <= corePoolSize * 0.5) {
      // 如果队列长度小于核心线程数且活跃线程数接近核心线程数,减少线程
      int newCorePoolSize = Math.max(corePoolSize / 2, 1);
      executor.setCorePoolSize(newCorePoolSize);
      log.error("活跃线程数是核心线程数的 0.5 倍,调整 corePoolSize = {}", newCorePoolSize);
    }
  }

  public static void main(String[] args) throws InterruptedException {
    // 创建一个初始大小为2,最大大小为5,空闲时间为 20 秒的线程池
    DynamicThreadPool pool = new DynamicThreadPool(2, 5, 20L, TimeUnit.SECONDS,20);

    // 开启 100 个线程
    for (int i = 0; i < 1000; i++) {
      pool.executor.submit(() -> {
        try {
          Thread.sleep(new Random().nextInt(1500));
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        log.info("线程[{}]执行完成...",Thread.currentThread().getName());
      });

      // 间隔提交任务
      Thread.sleep(new Random().nextInt(300));
    }
  }
}

首先我们创建创建一个 corePoolSize 为2,maximumPoolSize 为 5,任务队列容量为 20 的线程池,为了更好地演示,我们需要将拒绝策略设置为 DiscardPolicy,防止中途报错。

启动一个定时任务,每隔 3 秒执行一次用来监控线程池状态并动态调整 corePoolSizemaximumPoolSize。这里有两个条件:

    if (queueSize > maxPoolSize && activeCount >= maxPoolSize * 0.5) {
      // 如果队列长度大于最大线程数,且活跃线程数是最大线程的 0.5倍,增加线程
      int newPoolSize = Math.max(maxPoolSize, activeCount * 2);
      executor.setMaximumPoolSize(newPoolSize);
      log.error("活跃线程数是最大线程的 1.5 倍,调整 maxPoolSize = {}", newPoolSize);
    } else if (queueSize < corePoolSize && activeCount <= corePoolSize * 0.5) {
      // 如果队列长度小于核心线程数且活跃线程数接近核心线程数,减少线程
      int newCorePoolSize = Math.max(corePoolSize / 2, 1);
      executor.setCorePoolSize(newCorePoolSize);
      log.error("活跃线程数是核心线程数的 0.5 倍,调整 corePoolSize = {}", newCorePoolSize);
    }

任务积压过多就增加线程数,任务少了就减少线程。执行结果,如下: