# 线程池参数优化设置

线程池使用面临的核心的问题在于:线程池的参数并不好配置

代码地址

# 1. 业务场景

  • 快速响应用户请求
  • 快速处理批量任务

# 1.1. 快速响应用户请求

从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个页面了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    keepAliveSeconds, TimeUnit.SECONDS,
    new SynchronousQueue<>(false),
    new CustomizableThreadFactory(ioThreadNamePrefix),
    new ThreadPoolExecutor.AbortPolicy());

这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高 corePoolSize 和 maxPoolSize 去尽可能创造多的线程快速执行任务,使用 SynchronousQueue 任务队列,支持公平锁和非公平锁,配合拒绝策略 AbortPolicy,到达最大线程直接抛出异常,达到最快响应

# 2.2. 快速处理批量任务

离线的大量计算任务,需要快速执行,与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的 corePoolSize 去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    keepAliveSeconds, TimeUnit.SECONDS,
    new ResizableCapacityLinkedBlockingQueue<>(queueCapacity),
    new CustomizableThreadFactory(cpuThreadNamePrefix),
    new ThreadPoolExecutor.CallerRunsPolicy());

这个只能在测试中根据服务器资源平衡找到最优的配置,使用修改的 ResizableCapacityLinkedBlockingQueue,可以设定 capacity 参数,动态调整队列长度,在这里就不能使用拒绝策略的抛异常及丢弃任务等等,应该使用 CallerRunsPolicy,等待执行,当然也可以自定义拒绝策略,在策略执行失败日志记录,任务重新入库,重试执行几次等等

/**
 * 自定义拒绝策略
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/27 18:18
 */
public static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // log("r rejected")
        // save r kafka mysql redis
        // try 3 times
        if(executor.getQueue().size() < 10000) {
            // try put again();
        }
    }
}

# 2. 动态化线程池

  • 监控
  • 调整

# 2.1. 监控

查看当前线程池状态

/**
 * 线程池状态
 * 
 * @param name
 * @return java.lang.String
 * @throws 
 * @author wliduo[i@dolyw.com]
 * @date 2021/10/27 17:29
 */
@GetMapping("/state")
public String state(@RequestParam("name") String name) throws Exception {
    ThreadPoolExecutor threadPoolExecutor = null;
    if (SpringUtil.getBean(name) instanceof ThreadPoolTaskExecutor) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = SpringUtil.getBean(name);
        threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor();
    } else {
        threadPoolExecutor = SpringUtil.getBean(name);
    }
    CustomizableThreadFactory customizableThreadFactory = (CustomizableThreadFactory) threadPoolExecutor.getThreadFactory();
    StringBuffer stringBuffer = new StringBuffer("");
    stringBuffer.append("线程池名称: " + customizableThreadFactory.getThreadNamePrefix());
    stringBuffer.append("<br/>");
    stringBuffer.append("核心线程数: " + threadPoolExecutor.getCorePoolSize());
    stringBuffer.append("<br/>");
    stringBuffer.append("活动线程数:" + threadPoolExecutor.getActiveCount());
    stringBuffer.append("<br/>");
    stringBuffer.append("最大线程数:" + threadPoolExecutor.getMaximumPoolSize());
    stringBuffer.append("<br/>");
    stringBuffer.append("线程池活跃度(%):" + NumberUtil.div(threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize()) * 100);
    stringBuffer.append("<br/>");
    stringBuffer.append("任务总数:" + threadPoolExecutor.getTaskCount());
    stringBuffer.append("<br/>");
    stringBuffer.append("任务完成数:" + threadPoolExecutor.getCompletedTaskCount());
    stringBuffer.append("<br/>");
    stringBuffer.append("队列类型:" + threadPoolExecutor.getQueue().getClass().getName());
    stringBuffer.append("<br/>");
    if (threadPoolExecutor.getQueue().size() + threadPoolExecutor.getQueue().remainingCapacity() > 0) {
        stringBuffer.append("队列大小:" + (threadPoolExecutor.getQueue().size() + threadPoolExecutor.getQueue().remainingCapacity()));
        stringBuffer.append("<br/>");
        stringBuffer.append("当前排队线程数:" + threadPoolExecutor.getQueue().size());
        stringBuffer.append("<br/>");
        stringBuffer.append("队列剩余大小:" + threadPoolExecutor.getQueue().remainingCapacity());
        stringBuffer.append("<br/>");
        stringBuffer.append("队列使用度(%):" + NumberUtil.div(threadPoolExecutor.getQueue().size(), threadPoolExecutor.getQueue().size() + threadPoolExecutor.getQueue().remainingCapacity()) * 100);
        stringBuffer.append("<br/>");
    }
    stringBuffer.append("线程活跃时间(秒):" + threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
    stringBuffer.append("<br/>");
    stringBuffer.append("拒绝策略:" + threadPoolExecutor.getRejectedExecutionHandler().getClass().getName());
    return stringBuffer.toString();
}
线程池名称: jdk-thread-pool-
核心线程数: 15
活动线程数:10
最大线程数:30
线程池活跃度(%):33.333333329999995
任务总数:17
任务完成数:7
队列类型:com.example.config.ResizableCapacityLinkedBlockingQueue
队列大小:15
当前排队线程数:0
队列剩余大小:15
队列使用度(%):0.0
线程活跃时间():60
拒绝策略:java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy

# 2.2. 调整

动态更新线程池参数

@Autowired
private ThreadPoolExecutor jdkThreadPoolExecutor;

/**
 * 调整线程池
 * 
 * @param 
 * @return java.lang.String
 * @throws 
 * @author wliduo[i@dolyw.com]
 * @date 2021/10/27 17:32
 */
@GetMapping("/set")
public String set() throws Exception {
    jdkThreadPoolExecutor.setCorePoolSize(1);
    jdkThreadPoolExecutor.setMaximumPoolSize(2);
    jdkThreadPoolExecutor.setKeepAliveTime(120, TimeUnit.SECONDS);
    jdkThreadPoolExecutor.setThreadFactory(new CustomizableThreadFactory("jdk-thread-pool-new-"));
    jdkThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    ResizableCapacityLinkedBlockingQueue resizableCapacityLinkedBlockingQueue = (ResizableCapacityLinkedBlockingQueue) jdkThreadPoolExecutor.getQueue();
    // 设置队列长度
    resizableCapacityLinkedBlockingQueue.setCapacity(1);
    return "OK";
}

参考

上次更新时间: 2023-12-15 03:14:55