• 微信公众号:美女很有趣。 工作之余,放松一下,关注即送10G+美女照片!

Springboot使用ThreadPoolTaskExecutor线程池,多线程调用实例

互联网 diligentman 6小时前 2次浏览

项目场景:15万条数据需要做逻辑处理,然后存入数据库


硬件:windows 4核 i7 16G内存


问题描述:响应太慢,服务容易出现卡死


原因分析:

原先是JPA保存,每次与数据库交互都会创建连接,执行完关闭连接,这样是很耗时的


解决方案:

1.修改存库方式,通过mybatis批量insert,一般每批3000条,语句过长会报错,并不是每批量越大越好,需要根据实际情况分析,笔者这边不做过多解释,有兴趣的可以看下相关知识
2.通过多线程执行


多线程配置

新建一个class,把以下配置文件直接原封不动copy过去就行,了解一下参数含义

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


@Configuration
public class ApplicationConfiguration {

    /**
     * 公共线程池
     */
    @Bean
    public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor pool = new org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor();
        int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
        int corePoolSize = (int) (processNum / (1 - 0.2));
        int maxPoolSize = (int) (processNum / (1 - 0.5));
        pool.setCorePoolSize(corePoolSize); // 核心池大小
        pool.setMaxPoolSize(maxPoolSize); // 最大线程数
        pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度
        pool.setThreadPriority(Thread.MAX_PRIORITY);
        pool.setDaemon(false);
        pool.setKeepAliveSeconds(300);// 线程空闲时间
        return pool;
    }

}

实例调用

这里模拟一个保存方法

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor; // 注入线程池
    @Autowired
    private AggregateAccountingMapper aggregateAccountingMapper;
    @Autowired
    private AggregateAccountingService aggregateAccountingService;

    @GetMapping("/save")
    public void save() {
        List<AggregateAccounting> list = this.aggregateAccountingService.list(); // 这里模拟15万条数据源
        // 注入线程池
        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                threadPoolTaskExecutor);
        /**
         * partition()方法 大家可以看一下
         * 例如 3001条数据 他会自动帮你分成两个数组 第一个数组3000条 第二个数组1条 
         * 不需要我们再像以前一样 通过for循环处理截取
         */
        List<List<AggregateAccounting>> lists = Lists.partition(list, 3000);
        lists.forEach(item -> {
            // 这里做的事情就是 根据lists大小确认要多少个线程 给每个线程分配任务
            completionService.submit(new Callable() {
                @Override
                public Object call() throws Exception {
                    // insertList()方法 就是批量insert语句
                    return aggregateAccountingMapper.insertList(item);
                }
            });
        });
        // 这里是让多线程开始执行
        lists.forEach(item -> {
            try {
                completionService.take().get();
            } catch (InterruptedException | ExecutionException e) {
                System.out.println(e);
            }
        });
    }

}

总结

通过mybatis批量insert 15万条数据(包含业务处理时间)
1.单线程:65S
2.多线程:50S


喜欢 (0)