当前位置 : 首页 > 行业市场 >详情

13、百万数据分割颗粒度与异步线程实现 世界最新

行业市场 来源 :博客园 2023-04-03 10:35:42

声明百万数据List集合:


(资料图片)

/**     * 声明百万数据     * */    private static List dataList(){        List list = new ArrayList<>();        QueryVO queryVO = null;        int j = 0;        for (int i = 0; i < 1000000; i++) {            queryVO = QueryVO.builder().build();            queryVO.setTestNum(i);            //设置今天凌晨时间            queryVO.setTime(DateUtil.getNowDate(0));            if(i%100000 == 0){                j++;                //设置今天i刻时间                queryVO.setTime(DateUtil.getNowDate(j));            }            list.add(queryVO);        }        return list;    }
/**     * 获取今天凌晨后i刻的时间     * @return Date     * */    public static Date getNowDate(int i) {        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        Calendar calendar = Calendar.getInstance();        calendar.set(Calendar.HOUR_OF_DAY, 0);        calendar.set(Calendar.MINUTE, 0);        calendar.set(Calendar.SECOND, 0);        calendar.set(Calendar.MILLISECOND, 0);        calendar.add(Calendar.MINUTE, i * 15);        Date date = calendar.getTime();        return date;    }

一、百万数据分割颗粒度:

1、方式一:自定义分割List集合颗粒度方法:

/**     * 将集合按照splitSize分成多个子集合返回     * @param sourceList 原集合     * @param splitSize 子集合长度     */    private static List> splitList(List sourceList, int splitSize) {        List> splitList = new ArrayList<>();        if(!CollectionUtils.isEmpty(sourceList) && splitSize > 0) {            int sourceSize = sourceList.size();            int size = sourceSize / splitSize;            int left = sourceSize % splitSize;            for (int i = 0; i <= size; i++) {                if (i == size) {                    if(left !=0){                        splitList.add(sourceList.subList(i * splitSize, sourceSize));                    }                } else {                    splitList.add(sourceList.subList(i * splitSize, (i + 1) * splitSize));                }            }        }        return splitList;    }    /**     * 自定义List集合分割颗粒度     * @Method splitList()     * */    private static void customDemo(List list){        List> dataList = splitList(list, 100000);        System.out.println("自定义——splitList()实现List集合分割颗粒度:");        System.out.println("总数据集合量:"+dataList.size());        System.out.println("分支量:"+dataList.get(2).size());    }

2、方式二:基于google guava工具实现List集合分割颗粒度:

(1)、maven:

    com.google.guava    guava    21.0

(2)、实现方式:

/**     * google guava实现List集合分割颗粒度     * @Method Lists.partition()     * */    private static void guavaListsDemo(List list){        List> dataList = Lists.partition(list, 100000);        System.out.println("google guava——Lists.partition()实现List集合分割颗粒度:");        System.out.println("总数据集合量:"+dataList.size());        System.out.println("分支量:"+dataList.get(2).size());    }

3、方式三:基于apache commons collection工具实现List集合分割颗粒度:

(1)、maven:

    org.apache.commons    commons-collections4    4.4

(2)、实现方式:

/**     * apache commons collection实现List集合分割颗粒度     * @Method ListUtils.partition()     * */    private static void apacheListUtilsDemo(List list){        List> dataList = ListUtils.partition(list, 100000);        System.out.println("apache commons collection——ListUtils.partition()实现List集合分割颗粒度:");        System.out.println("总数据集合量:"+dataList.size());        System.out.println("分支量:"+dataList.get(2).size());    }

4、方式四:基于stream流实现List集合分割颗粒度:

/**     * stream流实现List集合分割颗粒度     * @Method grouping by     * */    private static void streamGroupDemo(List list){        Map> groups = list.stream().collect(Collectors.groupingBy(QueryVO::getTime));        List> dataList = new ArrayList<>(groups.values());        System.out.println("stream流——grouping by()实现List集合分割颗粒度:");        System.out.println("总数据集合量:"+dataList.size());        System.out.println("分支量:"+dataList.get(2).size());        /**         * 遍历方式一:         *         * //Iterator itLists = groups.entrySet().iterator();         * //while (itLists.hasNext()) {         * //    Map.Entry> queryVOList = (Map.Entry) itLists.next();         * //    //queryVOList.getValue().size();         * //}         *         * */        /***         * 遍历方式二:         *         * //Map> groups = list.stream().collect(Collectors.groupingBy(QueryVO::getTime));         * //List> dataList = new ArrayList<>(groups.values());         * //for (List queryVOList: dataList) {//业务}         *         */    }

二、百万数据的多线程方式实现:

声明线程池:

import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configurationpublic class TaskPoolConfig {    /**     * ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。     * ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。     *     * 拒绝策略:     * (1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。     *     但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大     * (2)、AbortPolicy: 丢弃任务,并抛出拒绝执行     * (3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。     * (4)、DiscardPolicy: 直接丢弃,其他啥都没有     * (5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入     */    @Value("${async.thread.concurrency.coreSize:10}")    private int coreSize;    @Value("${async.thread.concurrency.maxSize:20}")    private int maxSize;    @Value("${async.thread.concurrency.queueCapacity:1000}")    private int queueCapacity;    @Value("${async.thread.concurrency.keepAliveSeconds:10000}")    private int keepAliveSeconds;    @Bean    publicThreadPoolTaskExecutor threadPoolTaskExecutor() {        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(coreSize); //核心线程数        executor.setMaxPoolSize(maxSize);   //最大线程数        executor.setQueueCapacity(queueCapacity);   //最大等待队列数        executor.setKeepAliveSeconds(keepAliveSeconds); //除核心线程,其他线程的保留时间        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //等待队列满后的拒绝策略        executor.initialize();  //执行初始化        executor.setThreadNamePrefix("async-executor-");    //线程前缀名称        return executor;    }}

使用:

@Autowired    private ThreadPoolTaskExecutor threadPoolTaskExecutor;  //线程池配置声明

1、方式一:普通多线程实现:

/**     * 普通多线程实现     * */    private List comhandleList(List list){        List dataList = new ArrayList<>();        //百万List数据分割        List> list1 = Lists.partition(list, 100000);        //获取List0数据线程        Callable> query1List = () -> {            List dataQueryList = new ArrayList<>();            Optional.ofNullable(list1.get(0)).ifPresent(ps -> {                        ps.stream().forEach(p -> {                            QueryVO queryVO = QueryVO.builder()                                    .testNum(p.getTestNum())                                    .time(p.getTime())                                    .avgGrade(100.0)                                    .build();                            dataQueryList.add(queryVO);                        });            });            return dataQueryList;        };        Future> data1Future = threadPoolTaskExecutor.submit(query1List);        //获取List1数据线程        Callable> query2List = () -> {            List dataQueryList = new ArrayList<>();            Optional.ofNullable(list1.get(1)).ifPresent(ps -> {                ps.stream().forEach(p -> {                    QueryVO queryVO = QueryVO.builder()                            .testNum(p.getTestNum())                            .time(p.getTime())                            .avgGrade(100.0)                            .build();                    dataQueryList.add(queryVO);                });            });            return dataQueryList;        };        Future> data2Future = threadPoolTaskExecutor.submit(query2List);        try {            dataList.addAll(data1Future.get());            dataList.addAll(data2Future.get());} catch (InterruptedException e) {            e.printStackTrace();        } catch (ExecutionException e) {            e.printStackTrace();        }        return dataList;    }

2、方式二:CompletableFuture异步多线程实现:

(1)、普通方式:

/**     * 异步线程实现     * */    private List handleList(List list){        List dataList = new ArrayList<>();        //百万List数据分割        List> list1 = Lists.partition(list, 100000);        List cfList = new ArrayList<>();        CompletableFuture> list1Future =CompletableFuture                .supplyAsync(() ->  {                    List dataQueryList = new ArrayList<>();                    Optional.ofNullable(list1.get(0)).ifPresent(ps -> {                        ps.stream().forEach(p -> {                            QueryVO queryVO = QueryVO.builder()                                    .testNum(p.getTestNum())                                    .time(p.getTime())                                    .avgGrade(100.0)                                    .build();                            dataQueryList.add(queryVO);                        });                    });                    dataList.addAll(dataQueryList);                    return dataList;                }, threadPoolTaskExecutor);        CompletableFuture> list2Future = CompletableFuture                .supplyAsync(() ->  {                    List dataQueryList = new ArrayList<>();                    Optional.ofNullable(list1.get(1)).ifPresent(ps -> {                        ps.stream().forEach(p -> {                            QueryVO queryVO = QueryVO.builder()                                    .testNum(p.getTestNum())                                    .time(p.getTime())                                    .avgGrade(100.0)                                    .build();                            dataQueryList.add(queryVO);                        });                    });                    dataList.addAll(dataQueryList);                    return dataList;                }, threadPoolTaskExecutor);        cfList.add(list1Future);        cfList.add(list2Future);        //等待全部完成        CompletableFuture.allOf(cfList.toArray(new CompletableFuture[]{})).join();        return dataList;    }

(2)、全量方式:

/**     * 百万数据按数据大小分割颗粒度异步线程实现     * */    private List sizeHandleList(List list){        List dataList = new ArrayList<>();        //百万List数据分割        List> list1 = Lists.partition(list, 100000);        List cfList = new ArrayList<>();        for (List queryVOList : list1) {            cfList.add(CompletableFuture                    .supplyAsync(() -> {                        List dataQueryList = new ArrayList<>();                        Optional.ofNullable(queryVOList).ifPresent(ps -> {                            ps.stream().forEach(p -> {                                QueryVO queryVO = QueryVO.builder()                                        .testNum(p.getTestNum())                                        .time(p.getTime())                                        .avgGrade(100.0)                                        .build();                                dataQueryList.add(queryVO);                            });                        });                        return dataList.addAll(dataQueryList);                    }, threadPoolTaskExecutor));        }        //等待全部完成        CompletableFuture.allOf(cfList.toArray(new CompletableFuture[]{})).join();        return dataList;    }

三、学习参考:

CompletableFuture使用详解;

ThreadPoolTaskExecutor线程并发;

标签:

精彩放送

返回顶部