回答
ThreadPoolExecutor
对核心参数提供了一些 setter
方法,根据这些 setter
方法我们可以在程序运行时动态调整线程池的核心参数:
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
在实际生产场景下,我们需要定期检查线程池中的任务队列长度、活动线程数和核心线程数等指标。然后根据监控结果,调用对应的 setter
方法设置参数即可。
扩展
演示示例
下面大明哥用一个简单的示例来演示如何来动态调整线程池:
@Slf4j
public class DynamicThreadPool {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService scheduler;
public DynamicThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,int capacity) {
this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(capacity));
// 为了更好地演示,拒绝策略选择 DiscardPolicy ,任务直接丢弃,不管
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
this.scheduler = Executors.newScheduledThreadPool(1);
// 定时任务,每 10 秒检测一次
scheduler.scheduleAtFixedRate(this::adjustThreadPoolSize, 3, 3, TimeUnit.SECONDS);
}
/**
* 动态调整线程池
*/
private void adjustThreadPoolSize() {
// 队列大小
int queueSize = executor.getQueue().size();
// 活跃线程数
int activeCount = executor.getActiveCount();
// 核心线程数
int corePoolSize = executor.getCorePoolSize();
// 最大线程数
int maxPoolSize = executor.getMaximumPoolSize();
log.warn("queueSize:{};;activeCount:{};;corePoolSize:{};;maximumPoolSize:{}", queueSize, activeCount, corePoolSize, maxPoolSize);
if (queueSize > maxPoolSize && activeCount >= maxPoolSize * 0.5) {
// 如果队列长度大于最大线程数,且活跃线程数是最大线程的 0.5倍,增加线程
int newPoolSize = Math.max(maxPoolSize, activeCount * 2);
executor.setMaximumPoolSize(newPoolSize);
log.error("活跃线程数是最大线程的 1.5 倍,调整 maxPoolSize = {}", newPoolSize);
} else if (queueSize < corePoolSize && activeCount <= corePoolSize * 0.5) {
// 如果队列长度小于核心线程数且活跃线程数接近核心线程数,减少线程
int newCorePoolSize = Math.max(corePoolSize / 2, 1);
executor.setCorePoolSize(newCorePoolSize);
log.error("活跃线程数是核心线程数的 0.5 倍,调整 corePoolSize = {}", newCorePoolSize);
}
}
public static void main(String[] args) throws InterruptedException {
// 创建一个初始大小为2,最大大小为5,空闲时间为 20 秒的线程池
DynamicThreadPool pool = new DynamicThreadPool(2, 5, 20L, TimeUnit.SECONDS,20);
// 开启 100 个线程
for (int i = 0; i < 1000; i++) {
pool.executor.submit(() -> {
try {
Thread.sleep(new Random().nextInt(1500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("线程[{}]执行完成...",Thread.currentThread().getName());
});
// 间隔提交任务
Thread.sleep(new Random().nextInt(300));
}
}
}
首先我们创建创建一个 corePoolSize
为2,maximumPoolSize
为 5,任务队列容量为 20 的线程池,为了更好地演示,我们需要将拒绝策略设置为 DiscardPolicy,防止中途报错。
启动一个定时任务,每隔 3 秒执行一次用来监控线程池状态并动态调整 corePoolSize
和 maximumPoolSize
。这里有两个条件:
if (queueSize > maxPoolSize && activeCount >= maxPoolSize * 0.5) {
// 如果队列长度大于最大线程数,且活跃线程数是最大线程的 0.5倍,增加线程
int newPoolSize = Math.max(maxPoolSize, activeCount * 2);
executor.setMaximumPoolSize(newPoolSize);
log.error("活跃线程数是最大线程的 1.5 倍,调整 maxPoolSize = {}", newPoolSize);
} else if (queueSize < corePoolSize && activeCount <= corePoolSize * 0.5) {
// 如果队列长度小于核心线程数且活跃线程数接近核心线程数,减少线程
int newCorePoolSize = Math.max(corePoolSize / 2, 1);
executor.setCorePoolSize(newCorePoolSize);
log.error("活跃线程数是核心线程数的 0.5 倍,调整 corePoolSize = {}", newCorePoolSize);
}
任务积压过多就增加线程数,任务少了就减少线程。执行结果,如下:
任务不断提交,线程池中的任务会逐渐堆积,监控发现任务积压过多,则需要增加线程数,当运行到后面的时候,任务越来越少了,则会减少线程数: