13、百万数据分割颗粒度与异步线程实现 世界最新
声明百万数据List集合:
(资料图片)
/** * 声明百万数据 * */ private static ListdataList(){ List list = new ArrayList<>(); QueryVO queryVO = null; int j = 0; for (int i = 0; i < 1000000; i++) { queryVO = QueryVO.builder().build(); queryVO.setTestNum(i); //设置今天凌晨时间 queryVO.setTime(DateUtil.getNowDate(0)); if(i%100000 == 0){ j++; //设置今天i刻时间 queryVO.setTime(DateUtil.getNowDate(j)); } list.add(queryVO); } return list; }
/** * 获取今天凌晨后i刻的时间 * @return Date * */ public static Date getNowDate(int i) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); calendar.add(Calendar.MINUTE, i * 15); Date date = calendar.getTime(); return date; }一、百万数据分割颗粒度:
1、方式一:自定义分割List集合颗粒度方法:
/** * 将集合按照splitSize分成多个子集合返回 * @param sourceList 原集合 * @param splitSize 子集合长度 */ private staticList > splitList(List
sourceList, int splitSize) { List > splitList = new ArrayList<>(); if(!CollectionUtils.isEmpty(sourceList) && splitSize > 0) { int sourceSize = sourceList.size(); int size = sourceSize / splitSize; int left = sourceSize % splitSize; for (int i = 0; i <= size; i++) { if (i == size) { if(left !=0){ splitList.add(sourceList.subList(i * splitSize, sourceSize)); } } else { splitList.add(sourceList.subList(i * splitSize, (i + 1) * splitSize)); } } } return splitList; } /** * 自定义List集合分割颗粒度 * @Method splitList() * */ private static void customDemo(List
list){ List > dataList = splitList(list, 100000); System.out.println("自定义——splitList()实现List集合分割颗粒度:"); System.out.println("总数据集合量:"+dataList.size()); System.out.println("分支量:"+dataList.get(2).size()); }
2、方式二:基于google guava工具实现List集合分割颗粒度:
(1)、maven:
com.google.guava guava 21.0
(2)、实现方式:
/** * google guava实现List集合分割颗粒度 * @Method Lists.partition() * */ private static void guavaListsDemo(Listlist){ List > dataList = Lists.partition(list, 100000); System.out.println("google guava——Lists.partition()实现List集合分割颗粒度:"); System.out.println("总数据集合量:"+dataList.size()); System.out.println("分支量:"+dataList.get(2).size()); }
3、方式三:基于apache commons collection工具实现List集合分割颗粒度:
(1)、maven:
org.apache.commons commons-collections4 4.4
(2)、实现方式:
/** * apache commons collection实现List集合分割颗粒度 * @Method ListUtils.partition() * */ private static void apacheListUtilsDemo(Listlist){ List > dataList = ListUtils.partition(list, 100000); System.out.println("apache commons collection——ListUtils.partition()实现List集合分割颗粒度:"); System.out.println("总数据集合量:"+dataList.size()); System.out.println("分支量:"+dataList.get(2).size()); }
4、方式四:基于stream流实现List集合分割颗粒度:
/** * stream流实现List集合分割颗粒度 * @Method grouping by * */ private static void streamGroupDemo(Listlist){ Map > groups = list.stream().collect(Collectors.groupingBy(QueryVO::getTime)); List > dataList = new ArrayList<>(groups.values()); System.out.println("stream流——grouping by()实现List集合分割颗粒度:"); System.out.println("总数据集合量:"+dataList.size()); System.out.println("分支量:"+dataList.get(2).size()); /** * 遍历方式一: * * //Iterator itLists = groups.entrySet().iterator(); * //while (itLists.hasNext()) { * // Map.Entry
> queryVOList = (Map.Entry) itLists.next(); * // //queryVOList.getValue().size(); * //} * * */ /*** * 遍历方式二: * * //Map > groups = list.stream().collect(Collectors.groupingBy(QueryVO::getTime)); * //List > dataList = new ArrayList<>(groups.values()); * //for (List
queryVOList: dataList) {//业务} * */ }
二、百万数据的多线程方式实现:
声明线程池:
import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configurationpublic class TaskPoolConfig { /** * ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。 * ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。 * * 拒绝策略: * (1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。 * 但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大 * (2)、AbortPolicy: 丢弃任务,并抛出拒绝执行 * (3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。 * (4)、DiscardPolicy: 直接丢弃,其他啥都没有 * (5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入 */ @Value("${async.thread.concurrency.coreSize:10}") private int coreSize; @Value("${async.thread.concurrency.maxSize:20}") private int maxSize; @Value("${async.thread.concurrency.queueCapacity:1000}") private int queueCapacity; @Value("${async.thread.concurrency.keepAliveSeconds:10000}") private int keepAliveSeconds; @Bean publicThreadPoolTaskExecutor threadPoolTaskExecutor() { final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(coreSize); //核心线程数 executor.setMaxPoolSize(maxSize); //最大线程数 executor.setQueueCapacity(queueCapacity); //最大等待队列数 executor.setKeepAliveSeconds(keepAliveSeconds); //除核心线程,其他线程的保留时间 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //等待队列满后的拒绝策略 executor.initialize(); //执行初始化 executor.setThreadNamePrefix("async-executor-"); //线程前缀名称 return executor; }}使用:
@Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; //线程池配置声明
1、方式一:普通多线程实现:
/** * 普通多线程实现 * */ private ListcomhandleList(List list){ List dataList = new ArrayList<>(); //百万List数据分割 List > list1 = Lists.partition(list, 100000); //获取List0数据线程 Callable
> query1List = () -> { List
dataQueryList = new ArrayList<>(); Optional.ofNullable(list1.get(0)).ifPresent(ps -> { ps.stream().forEach(p -> { QueryVO queryVO = QueryVO.builder() .testNum(p.getTestNum()) .time(p.getTime()) .avgGrade(100.0) .build(); dataQueryList.add(queryVO); }); }); return dataQueryList; }; Future > data1Future = threadPoolTaskExecutor.submit(query1List);
//获取List1数据线程 Callable> query2List = () -> { List
dataQueryList = new ArrayList<>(); Optional.ofNullable(list1.get(1)).ifPresent(ps -> { ps.stream().forEach(p -> { QueryVO queryVO = QueryVO.builder() .testNum(p.getTestNum()) .time(p.getTime()) .avgGrade(100.0) .build(); dataQueryList.add(queryVO); }); }); return dataQueryList; }; Future > data2Future = threadPoolTaskExecutor.submit(query2List);
try { dataList.addAll(data1Future.get()); dataList.addAll(data2Future.get());} catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return dataList; }
2、方式二:CompletableFuture异步多线程实现:
(1)、普通方式:
/** * 异步线程实现 * */ private ListhandleList(List list){ List dataList = new ArrayList<>(); //百万List数据分割 List > list1 = Lists.partition(list, 100000); List
cfList = new ArrayList<>(); CompletableFuture > list1Future =CompletableFuture .supplyAsync(() -> { List
dataQueryList = new ArrayList<>(); Optional.ofNullable(list1.get(0)).ifPresent(ps -> { ps.stream().forEach(p -> { QueryVO queryVO = QueryVO.builder() .testNum(p.getTestNum()) .time(p.getTime()) .avgGrade(100.0) .build(); dataQueryList.add(queryVO); }); }); dataList.addAll(dataQueryList); return dataList; }, threadPoolTaskExecutor); CompletableFuture > list2Future = CompletableFuture .supplyAsync(() -> { List
dataQueryList = new ArrayList<>(); Optional.ofNullable(list1.get(1)).ifPresent(ps -> { ps.stream().forEach(p -> { QueryVO queryVO = QueryVO.builder() .testNum(p.getTestNum()) .time(p.getTime()) .avgGrade(100.0) .build(); dataQueryList.add(queryVO); }); }); dataList.addAll(dataQueryList); return dataList; }, threadPoolTaskExecutor); cfList.add(list1Future); cfList.add(list2Future); //等待全部完成 CompletableFuture.allOf(cfList.toArray(new CompletableFuture[]{})).join(); return dataList; }
(2)、全量方式:
/** * 百万数据按数据大小分割颗粒度异步线程实现 * */ private ListsizeHandleList(List list){ List dataList = new ArrayList<>(); //百万List数据分割 List > list1 = Lists.partition(list, 100000); List
cfList = new ArrayList<>(); for (List queryVOList : list1) { cfList.add(CompletableFuture .supplyAsync(() -> { List dataQueryList = new ArrayList<>(); Optional.ofNullable(queryVOList).ifPresent(ps -> { ps.stream().forEach(p -> { QueryVO queryVO = QueryVO.builder() .testNum(p.getTestNum()) .time(p.getTime()) .avgGrade(100.0) .build(); dataQueryList.add(queryVO); }); }); return dataList.addAll(dataQueryList); }, threadPoolTaskExecutor)); } //等待全部完成 CompletableFuture.allOf(cfList.toArray(new CompletableFuture[]{})).join(); return dataList; }
三、学习参考:
CompletableFuture使用详解;
ThreadPoolTaskExecutor线程并发;
标签: