Skip to content

高级

任务调度

Task和Executor(过时)

  • 函数式接口

  • Executor

  • execute(Runnable command): 接收一个 Runnable 对象,并异步地执行它。

  • ExecutorService是Executor 的一个子接口,提供了更丰富的功能

  • submit(Runnable/Callable task): 提交一个 Runnable 任务用于执行,并返回一个代表该任务Future
    • 用于提交单个任务,如果无返回值Future会get到null
    • 提交后会立刻返回,尽管可能没有计算完成(即get时还要等待)
  • invokeAll(Collection<? extends Callable<T>> tasks): 执行给定的任务集合。
    • 返回值为Future对象,任务都执行完成invoke才会返回值
  • shutdown(): 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。(并不是直接关闭),之后再提交任务会抛出RejectedExecutionException异常
    • exec.isTerminated()检查是否执行器已经关闭并且所有任务执行完成
  • shutdownNow(): 尝试停止所有正在执行的活动任务,并暂停处理正在等待的任务(直接关闭)。
  • 要注意的是并不是只能使用函数接口Callable以及Runable可以使用其它lambda以及方法引用作为参数传递给submit等

  • Future*(已过时)

  • 代表一个异步计算的结果
  • cancel(boolean mayInterruptIfRunning): 尝试取消任务的执行。
  • isCancelled(): 返回任务是否被取消。
  • isDone(): 返回任务是否已完成。
  • get(): 等待计算完成,然后检索其结果。
  • get(long timeout, TimeUnit unit): 如果必要,最多等待指定的时间以获取计算结果。

执行任务

public class Nap {
  public Nap(double t) { // Seconds
    try {
      TimeUnit.MILLISECONDS.sleep((int)(1000 * t));
    } catch(InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
  public Nap(double t, String msg) {
    this(t);
    System.out.println(msg);
  }
}

public class NapTask implements Runnable {
  final int id;
  public NapTask(int id) { this.id = id; }
  @Override public void run() {
    new Nap(0.1); // Seconds
    System.out.println(this + " " +
      Thread.currentThread().getName());
  }
  @Override public String toString() {
    return "NapTask [" + id + "]";
  }
}

public static void main(String [] args) {
    //工厂方法创建单线程执行持(所有任务顺序执行)
    ExecutorService exec =
      Executors.newSingleThreadExecutor();
    IntStream.range(0, 10)
        //从数字创建挂起任务
      .mapToObj(NapTask:: new)
        //交给执行器
      .forEach(exec:: execute);
    System.out.println("All tasks submitted");
    //不再接受
    exec.shutdown();
    //每 0.1s 检查一次
    while(! exec.isTerminated()) {
      System.out.println(
        Thread.currentThread().getName() +
        " awaiting termination");
      new Nap(0.1);
    }
  }

/* Output:
All tasks submitted
main awaiting termination
main awaiting termination
NapTask [0] pool-1-thread-1
main awaiting termination
NapTask [1] pool-1-thread-1
NapTask [2] pool-1-thread-1
main awaiting termination
NapTask [3] pool-1-thread-1
main awaiting termination
main awaiting termination
NapTask [4] pool-1-thread-1
NapTask [5] pool-1-thread-1
main awaiting termination
NapTask [6] pool-1-thread-1
main awaiting termination
main awaiting termination
NapTask [7] pool-1-thread-1
main awaiting termination
NapTask [8] pool-1-thread-1
main awaiting termination
NapTask [9] pool-1-thread-1
*/

  • 使用更多线程
  • ExecutorService exec Executors.newCachedThreadPool();
  • 小心副作用的影响,多个进程修改用一个变量(可变共享状态)会出现竞争

  • 生成结果

  • 避免使用可变共享状态
  • 自私儿童原则:什么都不共享
  • 使用Callable而不是Runable(即尽可能使用返回值作为结果,而不是用副作用即直接在方法中进行修改)

    //无副作用的方法
    public class CountingTask implements Callable <Integer> {
      final int id;
      public CountingTask(int id) { this.id = id; }
      @Override public Integer call() {
        Integer val = 0;
        for(int i = 0; i < 100; i++)
          val++;
        System.out.println(id + " " +
          Thread.currentThread().getName() + " " + val);
        return val;
      }
    }
    
    public class CachedThreadPool3 {
      public static Integer
      extractResult(Future <Integer> f) {
        try {
          return f.get();
        } catch(Exception e) {
          throw new RuntimeException(e);
        }
      }
      public static void main(String [] args)
        throws InterruptedException {
        ExecutorService exec =
          Executors.newCachedThreadPool();
          //构建任务集合
        List <CountingTask> tasks =
          IntStream.range(0, 10)
            .mapToObj(CountingTask:: new)
            .collect(Collectors.toList());
          //执行并存储结果
        List <Future<Integer> > futures =
          exec.invokeAll(tasks);
        Integer sum = futures.stream()
            //会自动等待执行完成在获取结果并计算
          .map(CachedThreadPool3:: extractResult)
          .reduce(0, Integer:: sum);
        System.out.println("sum = " + sum);
        exec.shutdown();
      }
    }
    //使用流优雅解决
    public static void main(String [] args) {
        System.out.println(
          IntStream.range(0, 10)
            .parallel()
            .mapToObj(CountingTask:: new)
            .map(ct -> ct.call())
            .reduce(0, Integer:: sum));
      }
    

  • 终止长时间运行的任务

  • 提前终止任务
  • 最好的方案是在任务内定期检查某个标识,并自己关闭流程终止
    • 使用AtomicBoolean作为标志,避免并发问题
      public class QuittableTask implements Runnable {
        final int id;
        public QuittableTask(int id) { this.id = id; }
        private AtomicBoolean running =
          new AtomicBoolean(true);
        public void quit() { running.set(false); }
        @Override public void run() {
            //跳出循环即终止
          while(running.get())
            new Nap(0.1);
          System.out.print(id + " ");
        }
      }
      
  • 可以在外面调用定义的quiet实现告知方法进行关闭,这比强制关闭更加安全

CompletableFuture(推荐)

  • 功能
  • 执行长时间运行的任务而不阻塞主线程: 如果你有耗时的任务,比如从网络加载数据、执行复杂计算或访问数据库,你可以使用 CompletableFuture 来异步执行这些任务,避免阻塞主线程。
  • 任务结果的链式处理: 你可以对 CompletableFuture 使用一系列的方法(比如 thenApply, thenAccept, thenCompose)来处理异步任务的结果,这些方法可以顺序执行,形成一个处理流程
  • 组合多个异步任务CompletableFuture 可以组合多个异步任务的结果。例如,你可以并行执行多个任务,并在它们都完成时获得最终结果。

  • 可以直接使用CompletableFuture来管理项目,不需要再使用ExecutorService

  • 创建CompletableFuture对象

  • 特殊类型CompletableFuture<void>只用于任务完成或失败时的提示
  • CompletableFuture<X> x = CompletableFuture.completedFuture(X类型对象)获取一个已经完成的对象(传入的对象就作为结果)

    • 不传入任何参数表示创建一个未完成的新对象
  • 手动(正常)完成x.complete(Result)如果 CompletableFuture 已经被完成(无论是正常完成、异常完成还是被取消),该方法不会改变其状态或结果。

    • 只有用于未完成的才会被正确完成并返回true
    • obtrudeValue(T value)用于强行更改 CompletableFuture 的结果,无论它之前是否已经完成,甚至是异常完成或被取消。
  • 取消任务x.cancel(true)

  • 检查任务执行状况

  • isDone()检查 CompletableFuture 是否已完成,无论是正常完成、异常完成还是被取消。
  • isCancelled()检查 CompletableFuture 是否被取消。
  • isCompletedExceptionally()是否异常完成,即是否在执行过程中抛出了异常。

    • 插入异常到CompletableFuture.completeException(new Exception());
  • 获取执行结果

  • getNow(T valueIfAbsent)尝试获取 CompletableFuture 的完成值。如果 CompletableFuture 尚未完成,则返回给定的默认值。
  • join()get() 类似,但 join() 抛出的是未经检查的异常,使其更适合于lambda表达式。

  • 执行任务

  • CompletableFuture<Void> runAsync(Runnable runnable)
  • 具有不同函数接口的许多版本supplyAsyncAcceptAsync

  • 常用的函数式接口及含义

  • Runnable:没有参数和返回值的。runAsync
  • Consumer:接受单个输入参数并且不返回结果的操作。它主要用于对参数进行操作或副作用。AcceptAsync
  • Function:有一个参数,并且有返回值,且类型可以不同,用于转换异步操作的结果。ApplyAsync
  • Supplier:没有输入但返回一个结果的函数,用于提供异步操作的结果。supplyAsync

  • 添加链式执行方法

  • CompletableFuture<Void> thenRunAsync(Runnable action)根据参数和返回值不同有不同函数接口的版本thenApplyAsync等等
  • 获取依赖数目getNumberOfDependents()即注册的依赖(后续操作)的数目
  • thenCompose 方法用于将两个异步操作串联起来,将一个异步操作的结果作为另一个异步操作的输入。

    CompletableFuture <Void> future = getUserEmail(userId)
        .thenCompose(email -> sendEmail(email, message));
    

  • getUserEmail 返回一个包含用户电子邮件的 CompletableFuture。然后,thenCompose 使用这个电子邮件地址来调用 sendEmail 方法,后者也返回一个 CompletableFuture

    public class CompletableApply {
      public static void main(String [] args) {
        CompletableFuture <Machina> cf =
          CompletableFuture.completedFuture(
            new Machina(0));
        CompletableFuture <Machina> cf2 =
          cf.thenApply(Machina:: work);
        CompletableFuture <Machina> cf3 =
          cf2.thenApply(Machina:: work);
        CompletableFuture <Machina> cf4 =
          cf3.thenApply(Machina:: work);
        CompletableFuture <Machina> cf5 =
          cf4.thenApply(Machina:: work);
      }
    }
    //更简洁的写法
    CompletableFuture <Machina> cf =
          CompletableFuture.completedFuture(
            new Machina(0))
          .thenApply(Machina:: work)
          .thenApply(Machina:: work)
          .thenApply(Machina:: work)
          .thenApply(Machina:: work);
    

  • 默认情况下需要等所有转化都完成创建过程才会结束
  • 使用Async为异步方法,即立即返回对象,在后台继续异步完成创建

  • 合并

  • 接受两个CompletableFuture操作并以多种方式合并

    public static void main(String [] args) {
        /*任何一个完成之后就执行方法
        Workable [BW]
        runAfterEither
        Workable [AW]*/
        voidr(cfA.runAfterEitherAsync(cfB, () -> System.out.println("runAfterEither")));
        /*Workable [BW]
        Workable [AW]
        runAfterBoth*/
        voidr(cfA.runAfterBothAsync(cfB, () -> System.out.println("runAfterBoth")));
        /*Workable [BW]
        applyToEither: Workable [BW]
        Workable [BW](返回值)
        Workable [AW](A 也完成了)*/
        showr(cfA.applyToEitherAsync(cfB, w -> {
            System.out.println("applyToEither: " + w);
            return w;
        }));
        /*Workable [BW]
        acceptEither: Workable [BW]
        Workable [AW]*/
        voidr(cfA.acceptEitherAsync(cfB, w -> {
            System.out.println("acceptEither: " + w);
        }));
        /*Workable [BW]
        Workable [AW]
        thenAcceptBoth: Workable [AW], Workable [BW]*/
        voidr(cfA.thenAcceptBothAsync(cfB, (w1, w2) -> {
            System.out.println("thenAcceptBoth: "
                               + w1 + ", " + w2);
        }));
        /*Workable [BW]
        Workable [AW]
        thenCombine: Workable [AW], Workable [BW]
        Workable [AW]*/
        showr(cfA.thenCombineAsync(cfB, (w1, w2) -> {
            System.out.println("thenCombine: "+ w1 + ", " + w2);
            return w1;
        }));
        //任何一个完成
        CompletableFuture <Workable>
            cfC = Workable.make("C", 0.08),
        cfD = Workable.make("D", 0.09);
        CompletableFuture.anyOf(cfA, cfB, cfC, cfD)
            .thenRunAsync(() -> System.out.println("anyOf"));
        //全部完成
        cfC = Workable.make("C", 0.08);
        cfD = Workable.make("D", 0.09);
        CompletableFuture.allOf(cfA, cfB, cfC, cfD)
            .thenRunAsync(() ->
                          System.out.println("allOf"));
    }
    

  • 以做蛋糕进行模拟

public class Batter {
  // 定义面糊的基本成分作为静态内部类
  static class Eggs {}
  static class Milk {}
  static class Sugar {}
  static class Flour {}

  // 泛型方法:模拟准备任何一种成分的时间
  static <T> T prepare(T ingredient) {
    new Nap(0.1); // 模拟准备时间,Nap类用于暂停0.1秒
    return ingredient; // 返回准备好的成分
  }

  // 泛型方法:异步准备成分
  static <T> CompletableFuture<T> prep(T ingredient) {
    return CompletableFuture
      .completedFuture(ingredient) // 创建一个已经完成的CompletableFuture
      .thenApplyAsync(Batter::prepare); // 异步应用prepare方法
  }

  // 异步混合所有成分以制作面糊
  public static CompletableFuture<Batter> mix() {
    // 分别异步准备每种成分
    CompletableFuture<Eggs> eggs = prep(new Eggs());
    CompletableFuture<Milk> milk = prep(new Milk());
    CompletableFuture<Sugar> sugar = prep(new Sugar());
    CompletableFuture<Flour> flour = prep(new Flour());

    // 等待所有成分准备完成
    CompletableFuture
      .allOf(eggs, milk, sugar, flour)
      .join(); // 阻塞等待所有成分准备完成

    new Nap(0.1); // 混合成分的时间

    // 返回包含面糊的CompletableFuture
    return CompletableFuture.completedFuture(new Batter());
  }
}
public class Baked {
  // 定义一个静态内部类 Pan,代表烘焙用的锅
  static class Pan {}

  // 接收Batter对象,模拟将面糊放入锅中,并返回一个Pan对象
  static Pan pan(Batter b) {
    new Nap(0.1); // 模拟放面糊进锅的时间
    return new Pan(); // 返回新的锅
  }

  // 接收一个Pan对象,模拟烘焙过程,并返回一个Baked对象
  static Baked heat(Pan p) {
    new Nap(0.1); // 模拟烘焙时间
    return new Baked(); // 返回烘焙完成的对象
  }

  // 将制作好的面糊转换为烘焙完成的蛋糕
  static CompletableFuture<Baked> bake(CompletableFuture<Batter> cfb) {
    return cfb
      .thenApplyAsync(Baked::pan) // 异步应用pan方法
      .thenApplyAsync(Baked::heat); // 接着异步应用heat方法
  }

  // 创建一个流,模拟烘焙一批蛋糕
  public static Stream<CompletableFuture<Baked>> batch() {
    CompletableFuture<Batter> batter = Batter.mix(); // 获取制作好的面糊
    // 返回一个流,其中包含多个烘焙蛋糕的CompletableFuture对象
    return Stream.of(bake(batter), bake(batter), bake(batter), bake(batter));
  }
}
final class Frosting {
  // 私有构造函数,防止实例化
  private Frosting() {}

  // 异步制作糖霜
  static CompletableFuture<Frosting> make() {
    new Nap(0.1); // 模拟制作糖霜的时间
    // 返回包含糖霜的CompletableFuture
    return CompletableFuture.completedFuture(new Frosting());
  }
}
public class FrostedCake {
  // 构造函数,创建一个有糖霜的蛋糕对象
  public FrostedCake(Baked baked, Frosting frosting) {
    new Nap(0.1); // 模拟装饰蛋糕的时间
  }

  // toString方法,返回蛋糕的字符串表示
  @Override public String toString() {
    return "FrostedCake";
  }

  // 程序的入口点
  public static void main(String[] args) {
    // 获取烘焙好的蛋糕流,对每个烘焙好的蛋糕进行处理
    Baked.batch().forEach(baked -> baked
      .thenCombineAsync(Frosting.make(), // 异步与新制作的糖霜结合
        (cake, frosting) -> new FrostedCake(cake, frosting)) // 创建有糖霜的蛋糕
      .thenAcceptAsync(System.out::println) // 异步打印蛋糕对象
      .join()); // 等待上述过程完成
  }
}
  • 异常处理
  • 会缓存异常,并且只在尝试提取结果时体现出来,可以使用trycatch处理,但是有更高级的异常处理机制
  • exceptionally 用于处理 CompletableFuture 中发生的异常。它提供了一种恢复机制,允许你返回一个默认值或执行一些替代操作。

    • 只有在出现异常时才会被调用
      .exceptionally((ex) -> {
          if(ex == null)
              System.out.println("I don't get it yet");
          return new Breakable(ex.getMessage(), 0);
      })
      
  • handle 方法用于处理 CompletableFuture结果或异常。它接收两个参数:结果和异常。根据这两个参数,你可以决定如何处理成功或失败的情况。

    .handle((result, fail) -> {
        if(fail != null)
            return "Failure recovery object";
        else
            return result + " is good";
    })
    

  • whenComplete 方法用于在 CompletableFuture 完成时执行一些操作,无论是正常完成还是异常结束。它不改变 CompletableFuture 的结果。

    .whenComplete((result, fail) -> {
        if(fail != null)
            System.out.println("It failed");
        else
            System.out.println(result + " OK");
    })
    

  • handlewhenCpmlete方法无论发生异常与否总是会被调用,区别是handle可以进行修改并且有返回值

  • 检查型异常:不能直接在 CompletableFutureStream 的lambda表达式中抛出检查型异常,除非你捕获并处理它们(即自己抛出自己处理),或者将它们转换为非检查型异常(RuntimeException)。

    CompletableFuture.supplyAsync(() -> {
        throw new IOException(); // 编译错误
    });
    CompletableFuture.supplyAsync(() -> {
        throw new IOException(); // 编译错误
    });
    

  • 预留结合,批量创建

    @Override public void run() {
        while(! generator.isCanceled()) {
          int val = generator.next();
          if(val % 2 != 0) {
            System.out.println(val + " not even!");
            generator.cancel(); // Cancels all EvenCheckers
          }
        }
      }
      // Test any IntGenerator:
      public static void test(IntGenerator gp, int count) {
        List <CompletableFuture<Void> > checkers =
          IntStream.range(0, count)
            .mapToObj(i -> new EvenChecker(gp, i))
            .map(CompletableFuture:: runAsync)//自动创建 CF 对象
            .collect(Collectors.toList());
        checkers.forEach(CompletableFuture:: join);