学无先后,达者为师

网站首页 编程语言 正文

CompletableFuture解决多线程返回结果问题

作者:wl_Honest 更新时间: 2022-10-14 编程语言

什么是CompletableFuture?

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

使用这种并行方式,可以极大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是 Future API的扩展。

Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。

从 Callbale和 Future 教程可以学习更多关于 Future 知识.

Future API 是非常好的 Java 异步编程进阶,但是它缺乏一些非常重要和有用的特性。

CompletableFuture 实现了 Future 和 CompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。

之前项目中需要对一组集合进行处理,集合内的所有元素处理完后更新任务状态。当时经过询问得知可以用到CompletableFuture,于是经过短暂的研究写了一个很粗略的代码,如下:

public void calculateAvgSensorData() {
        //0.获取同步锁
        Boolean isLock = redisUtils.getLockWithExpire(LOCK_CAL_CABINET_AVG_ENV, LOCK_EXPIRE);
        if (!isLock) {
            log.warn("获取互斥锁: {}失败", LOCK_CAL_CABINET_AVG_ENV);
            return;
        }
        //1.查询昨日每个机柜一整天的环境数据均值,如果有报警,则用报警数据
        //1.1.获取当前日期
        Date today = DateUtil.date();
        //1.2.获取昨天日期
        Date yesterday = DateUtil.offsetDay(today, DAY_OFFSET);
        String queryDate = DateUtil.formatDate(yesterday);
        //1.3.查询机房列表
        List<BaseProps> roomList = roomService.getAllByUsed(1);
        //1.4.查询传感器列表
        List<Sensor> sensorList = sensorService.list();
        if (CollectionUtil.isEmpty(roomList) || CollectionUtil.isEmpty(sensorList)) {
            return;
        }
        //1.5.记录任务执行时间
        AutoTask autoTask = autoTaskService.getById(AutoTaskId.CAL_CABINET_AVG_ENV_TASK);
        autoTask.setTaskStatus(AutoTaskStatus.EXECUTING);
        HistoryAutoTask historyAutoTask = BeanUtil.copyProperties(autoTask, HistoryAutoTask.class);
        historyAutoTask.setExecuteTime(new Date());
        //记录任务开始时间
        historyAutoTaskService.startAutoTask(historyAutoTask);
        //1.6.组装待查询的机房队列
        roomList.forEach(room -> {
            queue.add(room);
        });

        for(int i = 0; i < MAX_THREAD; i++) {
            //创建异步执行任务
            CompletableFuture cf = CompletableFuture.runAsync(()->{
                do{
                    try {
                        queryCabinetEnv(sensorList, queryDate);
                    } catch (Exception e) {
                        log.error("插入机房环境数据均值失败,失败原因: {}", e);
                        historyAutoTask.setTaskStatus(AutoTaskStatus.FAILED);
                        historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
                    }
                }while(queue.size() > 0);
            }, taskExecutor).whenComplete((res,excption)-> {
                //3.记录定时任务执行状态
                historyAutoTask.setTaskStatus(AutoTaskStatus.EXECUTED);
                historyAutoTask.setFinishTime(new Date());
                historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
            });
        }
    }

如上代码,虽然用到了线程池,也用到了CompletableFuture,不过一眼可以看出问题所在,就是在每个遍历结束后都会更新一次任务状态,这明显是不对的。明明在所有任务执行完后再执行一次任务状态更新就可以了,这里却每个线程执行完任务后更新一次任务状态。

对于如上代码,肯定是有更加优雅的写法的,再次经过深入学习后,写出以下例子仅供改造参考:

package com.tct.ii.ucr.task;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * @author wl
 * @date 2022/10/12
 */
@Slf4j
public class TestFuture {
    public static CompletableFuture<String> printStr(String str, ExecutorService executorService) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("str:{}", str);
            return str;
        }, executorService);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<String> list = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        List<CompletableFuture<String>> futureList = list.stream()
                .map(str -> printStr(str, executorService)).collect(Collectors.toList());
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
        CompletableFuture<List<String>> resultFuture = allFuture.thenApply(v -> futureList.stream().map(future -> future.join()).collect(Collectors.toList()));
        log.info("result:{}", resultFuture.get());
        executorService.shutdown();
    }
}

在多任务组合中,allOf:等待所有任务完成,anyOf:只要有一个任务完成。

测试结果如图:

可以看到这里既用到了线程池,最后调用allOf方法等待所有任务执行完后可以一次性获取结果,非常方便和优雅。 

原文链接:https://blog.csdn.net/wl_Honest/article/details/127286938

栏目分类
最近更新