SpringBoot用线程池ThreadPoolExecutor处理百万级数据
更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。
一、背景:
使用JDK线程池ThreadPoolExecutor多线程异步执行批量插入、更新等操作方法,提高百万级数据插入效率。
二、具体细节:

2.1、创建自适应机器本身线程数量的线程池
//创建自适应机器本身线程数量的线程池Integer processNum = Runtime.getRuntime().availableProcessors();int corePoolSize = (int) (processNum / (1 - 0.2));int maxPoolSize = (int) (processNum / (1 - 0.5));ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,maxPoolSize,2L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());@Overridepublic boolean batchInsert(List<Student> list) throws Exception {Future<Boolean> a = null;try {/*** submit与execute 都是向线程池提交任务。* submit提交后执行提交类实现callable方法后重写的call方法,execute提交后执行实现Runnable的run方法* Runnable任务没有返回值,而Callable任务有返回值。* 并且Callable的call()方法只能通过ExecutorService的submit(Callable <T> task) 方法来执行* 多人同时提交时的线程控制:多线程多任务*/a = executorService.submit(new BatchWay(list,studentService));return a.get();} catch (Exception e) {e.printStackTrace();try {return a.get();} catch (Exception ex) {ex.printStackTrace();return false;}}}
2.2、业务核心处理类:
@Slf4jpublic class BatchWay implements Callable<Boolean> {private int batch100 = 100; //100条为分界批量导入private List<Student> list; //list中的大量数据private StudentService studentService;//有参的构造函数,方便初始化其类public BatchWay(List<Student> list, StudentService studentService) {this.list = list;this.studentService = studentService;}/**线程池*/// private ThreadPoolExecutor threadPoolExecutor =// new ThreadPoolExecutor(// 10, //corePoolSize:线程池中核心线程数// Runtime.getRuntime().availableProcessors(), //线程池中能拥有最多线程数 取所有// 5L, //keepAliveTime:表示空闲线程的存活时间 2秒// TimeUnit.SECONDS, //表示keepAliveTime的单位:秒// new LinkedBlockingQueue<>(100), //用于缓存任务的阻塞队列Executors.defaultThreadFactory(),// new ThreadPoolExecutor.CallerRunsPolicy()// );/*** 功能描述:实现Callable的call方法* @MethodName: call* @MethodParam: []* @Return: java.lang.Boolean* @Author: yyalin* @CreateDate: 2022/5/6 15:46*/public Boolean call(){try {batchOp(list);return true;} catch (Exception e) {e.printStackTrace();}return false;}/*** 功能描述:批量保存数据* @MethodName: batchOp* @MethodParam: [list]* @Return: void* @Author: yyalin* @CreateDate: 2022/5/6 15:40*/private void batchOp(List<Student> list) {if(!list.isEmpty()){Integer size = list.size();if(size<=batch100){//小于分批的直接插入即可studentService.saveBatch(list);}else if(size>batch100){//分批后再进行保存数据batchOpSpilit(list,batch100);}}}/*** 功能描述:对list进行切割* @MethodName: batchOpSpilit* @MethodParam: [list, batch100]* @Return: void* @Author: yyalin* @CreateDate: 2022/5/6 15:43*/private void batchOpSpilit(List<Student> list, int batch100) {log.info("开始切割………………");List<List<Student>> list1 = SplitListUtils.pagingList(list, batch100);try {for (List<Student> list2 : list1) {batchOp(list2);// threadPoolExecutor.allowCoreThreadTimeOut(true);// //再调batchOp方法,这里的多线程是多个小集合往数据库插入// threadPoolExecutor.execute(() -> {log.info("我是线程开始保存数据...:" + Thread.currentThread().getName());// batchOp(list2);// });}// log.info("当前线程池剩余的数量222222:"+threadPoolExecutor.getPoolSize());} catch (Exception e) {// log.info("出现异常:"+e);} finally {//最后关闭线程 不允许提交新的任务,但是会处理完已提交的任务// threadPoolExecutor.shutdown();}}
2.3、造数据,多线程异步插入:
public String batchWay() throws Exception {log.info("开始批量操作.........");Random rand = new Random();List<Student> list = new ArrayList<>();for (int i = 0; i < 1000003; i++) {Student student=new Student();student.setStudentName("小李"+i);student.setAddr("上海"+rand.nextInt(9) * 1000);student.setAge(rand.nextInt(1000));student.setPhone("134"+rand.nextInt(9) * 1000);list.add(student);}long startTime = System.currentTimeMillis(); // 开始时间boolean a=studentService.batchInsert(list);long endTime = System.currentTimeMillis(); //结束时间return "执行完成一共耗时time: " + (endTime - startTime) / 1000 + " s";}
2.4、测试结果

汇总结果:
| 序号 | 核心线程(core_pool_size) | 插入数据(万) | 耗时(秒) |
| 1 | 10 | 100w | 38s |
| 2 | 15 | 100w | 32s |
| 3 | 50 | 100w | 31s |
个人推荐:SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据的方法。
总结:ThreadPoolTaskExecutor和ThreadPoolExecutor比Executors创建线程池更加灵活,可以设置参数,推荐ThreadPoolTaskExecutor和ThreadPoolExecutor,而ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。
更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。
推荐文章:
1、SpringBoot使用@Async实现多线程异步;
2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
3、SpringBoot用线程池ThreadPoolExecutor处理百万级数据



















