高级
任务调度¶
Task和Executor(过时)¶
-
Executor
-
execute(Runnable command)
: 接收一个Runnable
对象,并异步地执行它。 -
ExecutorService是
Executor
的一个子接口,提供了更丰富的功能 submit(Runnable/Callable task)
: 提交一个Runnable
任务用于执行,并返回一个代表该任务的Future
。- 用于提交单个任务,如果无返回值Future会get到null
- 提交后会立刻返回,尽管可能没有计算完成(即get时还要等待)
invokeAll(Collection<? extends Callable<T>> tasks)
: 执行给定的任务集合。- 返回值为Future对象,任务都执行完成invoke才会返回值
shutdown()
: 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。(并不是直接关闭),之后再提交任务会抛出RejectedExecutionException异常exec.isTerminated()
检查是否执行器已经关闭并且所有任务执行完成
shutdownNow()
: 尝试停止所有正在执行的活动任务,并暂停处理正在等待的任务(直接关闭)。-
要注意的是并不是只能使用函数接口Callable以及Runable可以使用其它lambda以及方法引用作为参数传递给submit等
-
Future*(已过时)
- 代表一个异步计算的结果
cancel(boolean mayInterruptIfRunning)
: 尝试取消任务的执行。isCancelled()
: 返回任务是否被取消。isDone()
: 返回任务是否已完成。get()
: 等待计算完成,然后检索其结果。get(long timeout, TimeUnit unit)
: 如果必要,最多等待指定的时间以获取计算结果。
执行任务
public class Nap {
public Nap(double t) { // Seconds
try {
TimeUnit.MILLISECONDS.sleep((int)(1000 * t));
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public Nap(double t, String msg) {
this(t);
System.out.println(msg);
}
}
public class NapTask implements Runnable {
final int id;
public NapTask(int id) { this.id = id; }
@Override public void run() {
new Nap(0.1); // Seconds
System.out.println(this + " " +
Thread.currentThread().getName());
}
@Override public String toString() {
return "NapTask [" + id + "]";
}
}
public static void main(String [] args) {
//工厂方法创建单线程执行持(所有任务顺序执行)
ExecutorService exec =
Executors.newSingleThreadExecutor();
IntStream.range(0, 10)
//从数字创建挂起任务
.mapToObj(NapTask:: new)
//交给执行器
.forEach(exec:: execute);
System.out.println("All tasks submitted");
//不再接受
exec.shutdown();
//每 0.1s 检查一次
while(! exec.isTerminated()) {
System.out.println(
Thread.currentThread().getName() +
" awaiting termination");
new Nap(0.1);
}
}
/* Output:
All tasks submitted
main awaiting termination
main awaiting termination
NapTask [0] pool-1-thread-1
main awaiting termination
NapTask [1] pool-1-thread-1
NapTask [2] pool-1-thread-1
main awaiting termination
NapTask [3] pool-1-thread-1
main awaiting termination
main awaiting termination
NapTask [4] pool-1-thread-1
NapTask [5] pool-1-thread-1
main awaiting termination
NapTask [6] pool-1-thread-1
main awaiting termination
main awaiting termination
NapTask [7] pool-1-thread-1
main awaiting termination
NapTask [8] pool-1-thread-1
main awaiting termination
NapTask [9] pool-1-thread-1
*/
- 使用更多线程
ExecutorService exec Executors.newCachedThreadPool();
-
小心副作用的影响,多个进程修改用一个变量(可变共享状态)会出现竞争
-
生成结果
- 避免使用可变共享状态
- 自私儿童原则:什么都不共享
-
使用Callable而不是Runable(即尽可能使用返回值作为结果,而不是用副作用即直接在方法中进行修改)
//无副作用的方法 public class CountingTask implements Callable <Integer> { final int id; public CountingTask(int id) { this.id = id; } @Override public Integer call() { Integer val = 0; for(int i = 0; i < 100; i++) val++; System.out.println(id + " " + Thread.currentThread().getName() + " " + val); return val; } } public class CachedThreadPool3 { public static Integer extractResult(Future <Integer> f) { try { return f.get(); } catch(Exception e) { throw new RuntimeException(e); } } public static void main(String [] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); //构建任务集合 List <CountingTask> tasks = IntStream.range(0, 10) .mapToObj(CountingTask:: new) .collect(Collectors.toList()); //执行并存储结果 List <Future<Integer> > futures = exec.invokeAll(tasks); Integer sum = futures.stream() //会自动等待执行完成在获取结果并计算 .map(CachedThreadPool3:: extractResult) .reduce(0, Integer:: sum); System.out.println("sum = " + sum); exec.shutdown(); } } //使用流优雅解决 public static void main(String [] args) { System.out.println( IntStream.range(0, 10) .parallel() .mapToObj(CountingTask:: new) .map(ct -> ct.call()) .reduce(0, Integer:: sum)); }
-
终止长时间运行的任务
- 提前终止任务
- 最好的方案是在任务内定期检查某个标识,并自己关闭流程终止
- 使用AtomicBoolean作为标志,避免并发问题
public class QuittableTask implements Runnable { final int id; public QuittableTask(int id) { this.id = id; } private AtomicBoolean running = new AtomicBoolean(true); public void quit() { running.set(false); } @Override public void run() { //跳出循环即终止 while(running.get()) new Nap(0.1); System.out.print(id + " "); } }
- 使用AtomicBoolean作为标志,避免并发问题
- 可以在外面调用定义的quiet实现告知方法进行关闭,这比强制关闭更加安全
CompletableFuture(推荐)¶
- 功能
- 执行长时间运行的任务而不阻塞主线程: 如果你有耗时的任务,比如从网络加载数据、执行复杂计算或访问数据库,你可以使用
CompletableFuture
来异步执行这些任务,避免阻塞主线程。 - 任务结果的链式处理: 你可以对
CompletableFuture
使用一系列的方法(比如thenApply
,thenAccept
,thenCompose
)来处理异步任务的结果,这些方法可以顺序执行,形成一个处理流程。 -
组合多个异步任务:
CompletableFuture
可以组合多个异步任务的结果。例如,你可以并行执行多个任务,并在它们都完成时获得最终结果。 -
可以直接使用CompletableFuture来管理项目,不需要再使用ExecutorService
-
创建CompletableFuture对象
- 特殊类型
CompletableFuture<void>
只用于任务完成或失败时的提示 -
CompletableFuture<X> x = CompletableFuture.completedFuture(X类型对象)
获取一个已经完成的对象(传入的对象就作为结果)- 不传入任何参数表示创建一个未完成的新对象
-
手动(正常)完成
x.complete(Result)
如果 CompletableFuture 已经被完成(无论是正常完成、异常完成还是被取消),该方法不会改变其状态或结果。- 只有用于未完成的才会被正确完成并返回true
obtrudeValue(T value)
用于强行更改CompletableFuture
的结果,无论它之前是否已经完成,甚至是异常完成或被取消。
-
取消任务
x.cancel(true)
-
检查任务执行状况
isDone()
检查CompletableFuture
是否已完成,无论是正常完成、异常完成还是被取消。isCancelled()
检查CompletableFuture
是否被取消。-
isCompletedExceptionally()
是否异常完成,即是否在执行过程中抛出了异常。- 插入异常到CompletableFuture
.completeException(new Exception());
- 插入异常到CompletableFuture
-
获取执行结果
getNow(T valueIfAbsent)
尝试获取CompletableFuture
的完成值。如果CompletableFuture
尚未完成,则返回给定的默认值。-
join()
与get()
类似,但join()
抛出的是未经检查的异常,使其更适合于lambda表达式。 -
执行任务
CompletableFuture<Void> runAsync(Runnable runnable)
-
具有不同函数接口的许多版本
supplyAsync
、AcceptAsync
等 -
常用的函数式接口及含义
Runnable
:没有参数和返回值的。runAsync
Consumer
:接受单个输入参数并且不返回结果的操作。它主要用于对参数进行操作或副作用。AcceptAsync
Function
:有一个参数,并且有返回值,且类型可以不同,用于转换异步操作的结果。ApplyAsync
-
Supplier
:没有输入但返回一个结果的函数,用于提供异步操作的结果。supplyAsync
-
添加链式执行方法
CompletableFuture<Void> thenRunAsync(Runnable action)
根据参数和返回值不同有不同函数接口的版本thenApplyAsync
等等- 获取依赖数目
getNumberOfDependents()
即注册的依赖(后续操作)的数目 -
thenCompose
方法用于将两个异步操作串联起来,将一个异步操作的结果作为另一个异步操作的输入。CompletableFuture <Void> future = getUserEmail(userId) .thenCompose(email -> sendEmail(email, message));
-
getUserEmail
返回一个包含用户电子邮件的CompletableFuture
。然后,thenCompose
使用这个电子邮件地址来调用sendEmail
方法,后者也返回一个CompletableFuture
。public class CompletableApply { public static void main(String [] args) { CompletableFuture <Machina> cf = CompletableFuture.completedFuture( new Machina(0)); CompletableFuture <Machina> cf2 = cf.thenApply(Machina:: work); CompletableFuture <Machina> cf3 = cf2.thenApply(Machina:: work); CompletableFuture <Machina> cf4 = cf3.thenApply(Machina:: work); CompletableFuture <Machina> cf5 = cf4.thenApply(Machina:: work); } } //更简洁的写法 CompletableFuture <Machina> cf = CompletableFuture.completedFuture( new Machina(0)) .thenApply(Machina:: work) .thenApply(Machina:: work) .thenApply(Machina:: work) .thenApply(Machina:: work);
- 默认情况下需要等所有转化都完成创建过程才会结束
-
使用
Async
为异步方法,即立即返回对象,在后台继续异步完成创建 -
合并
-
接受两个CompletableFuture操作并以多种方式合并
public static void main(String [] args) { /*任何一个完成之后就执行方法 Workable [BW] runAfterEither Workable [AW]*/ voidr(cfA.runAfterEitherAsync(cfB, () -> System.out.println("runAfterEither"))); /*Workable [BW] Workable [AW] runAfterBoth*/ voidr(cfA.runAfterBothAsync(cfB, () -> System.out.println("runAfterBoth"))); /*Workable [BW] applyToEither: Workable [BW] Workable [BW](返回值) Workable [AW](A 也完成了)*/ showr(cfA.applyToEitherAsync(cfB, w -> { System.out.println("applyToEither: " + w); return w; })); /*Workable [BW] acceptEither: Workable [BW] Workable [AW]*/ voidr(cfA.acceptEitherAsync(cfB, w -> { System.out.println("acceptEither: " + w); })); /*Workable [BW] Workable [AW] thenAcceptBoth: Workable [AW], Workable [BW]*/ voidr(cfA.thenAcceptBothAsync(cfB, (w1, w2) -> { System.out.println("thenAcceptBoth: " + w1 + ", " + w2); })); /*Workable [BW] Workable [AW] thenCombine: Workable [AW], Workable [BW] Workable [AW]*/ showr(cfA.thenCombineAsync(cfB, (w1, w2) -> { System.out.println("thenCombine: "+ w1 + ", " + w2); return w1; })); //任何一个完成 CompletableFuture <Workable> cfC = Workable.make("C", 0.08), cfD = Workable.make("D", 0.09); CompletableFuture.anyOf(cfA, cfB, cfC, cfD) .thenRunAsync(() -> System.out.println("anyOf")); //全部完成 cfC = Workable.make("C", 0.08); cfD = Workable.make("D", 0.09); CompletableFuture.allOf(cfA, cfB, cfC, cfD) .thenRunAsync(() -> System.out.println("allOf")); }
-
以做蛋糕进行模拟
public class Batter {
// 定义面糊的基本成分作为静态内部类
static class Eggs {}
static class Milk {}
static class Sugar {}
static class Flour {}
// 泛型方法:模拟准备任何一种成分的时间
static <T> T prepare(T ingredient) {
new Nap(0.1); // 模拟准备时间,Nap类用于暂停0.1秒
return ingredient; // 返回准备好的成分
}
// 泛型方法:异步准备成分
static <T> CompletableFuture<T> prep(T ingredient) {
return CompletableFuture
.completedFuture(ingredient) // 创建一个已经完成的CompletableFuture
.thenApplyAsync(Batter::prepare); // 异步应用prepare方法
}
// 异步混合所有成分以制作面糊
public static CompletableFuture<Batter> mix() {
// 分别异步准备每种成分
CompletableFuture<Eggs> eggs = prep(new Eggs());
CompletableFuture<Milk> milk = prep(new Milk());
CompletableFuture<Sugar> sugar = prep(new Sugar());
CompletableFuture<Flour> flour = prep(new Flour());
// 等待所有成分准备完成
CompletableFuture
.allOf(eggs, milk, sugar, flour)
.join(); // 阻塞等待所有成分准备完成
new Nap(0.1); // 混合成分的时间
// 返回包含面糊的CompletableFuture
return CompletableFuture.completedFuture(new Batter());
}
}
public class Baked {
// 定义一个静态内部类 Pan,代表烘焙用的锅
static class Pan {}
// 接收Batter对象,模拟将面糊放入锅中,并返回一个Pan对象
static Pan pan(Batter b) {
new Nap(0.1); // 模拟放面糊进锅的时间
return new Pan(); // 返回新的锅
}
// 接收一个Pan对象,模拟烘焙过程,并返回一个Baked对象
static Baked heat(Pan p) {
new Nap(0.1); // 模拟烘焙时间
return new Baked(); // 返回烘焙完成的对象
}
// 将制作好的面糊转换为烘焙完成的蛋糕
static CompletableFuture<Baked> bake(CompletableFuture<Batter> cfb) {
return cfb
.thenApplyAsync(Baked::pan) // 异步应用pan方法
.thenApplyAsync(Baked::heat); // 接着异步应用heat方法
}
// 创建一个流,模拟烘焙一批蛋糕
public static Stream<CompletableFuture<Baked>> batch() {
CompletableFuture<Batter> batter = Batter.mix(); // 获取制作好的面糊
// 返回一个流,其中包含多个烘焙蛋糕的CompletableFuture对象
return Stream.of(bake(batter), bake(batter), bake(batter), bake(batter));
}
}
final class Frosting {
// 私有构造函数,防止实例化
private Frosting() {}
// 异步制作糖霜
static CompletableFuture<Frosting> make() {
new Nap(0.1); // 模拟制作糖霜的时间
// 返回包含糖霜的CompletableFuture
return CompletableFuture.completedFuture(new Frosting());
}
}
public class FrostedCake {
// 构造函数,创建一个有糖霜的蛋糕对象
public FrostedCake(Baked baked, Frosting frosting) {
new Nap(0.1); // 模拟装饰蛋糕的时间
}
// toString方法,返回蛋糕的字符串表示
@Override public String toString() {
return "FrostedCake";
}
// 程序的入口点
public static void main(String[] args) {
// 获取烘焙好的蛋糕流,对每个烘焙好的蛋糕进行处理
Baked.batch().forEach(baked -> baked
.thenCombineAsync(Frosting.make(), // 异步与新制作的糖霜结合
(cake, frosting) -> new FrostedCake(cake, frosting)) // 创建有糖霜的蛋糕
.thenAcceptAsync(System.out::println) // 异步打印蛋糕对象
.join()); // 等待上述过程完成
}
}
- 异常处理
- 会缓存异常,并且只在尝试提取结果时体现出来,可以使用trycatch处理,但是有更高级的异常处理机制
-
exceptionally
用于处理CompletableFuture
中发生的异常。它提供了一种恢复机制,允许你返回一个默认值或执行一些替代操作。- 只有在出现异常时才会被调用
.exceptionally((ex) -> { if(ex == null) System.out.println("I don't get it yet"); return new Breakable(ex.getMessage(), 0); })
- 只有在出现异常时才会被调用
-
handle
方法用于处理CompletableFuture
的结果或异常。它接收两个参数:结果和异常。根据这两个参数,你可以决定如何处理成功或失败的情况。.handle((result, fail) -> { if(fail != null) return "Failure recovery object"; else return result + " is good"; })
-
whenComplete
方法用于在CompletableFuture
完成时执行一些操作,无论是正常完成还是异常结束。它不改变CompletableFuture
的结果。.whenComplete((result, fail) -> { if(fail != null) System.out.println("It failed"); else System.out.println(result + " OK"); })
-
handle
和whenCpmlete
方法无论发生异常与否总是会被调用,区别是handle可以进行修改并且有返回值 -
检查型异常:不能直接在
CompletableFuture
或Stream
的lambda表达式中抛出检查型异常,除非你捕获并处理它们(即自己抛出自己处理),或者将它们转换为非检查型异常(RuntimeException)。CompletableFuture.supplyAsync(() -> { throw new IOException(); // 编译错误 }); CompletableFuture.supplyAsync(() -> { throw new IOException(); // 编译错误 });
-
预留结合,批量创建
@Override public void run() { while(! generator.isCanceled()) { int val = generator.next(); if(val % 2 != 0) { System.out.println(val + " not even!"); generator.cancel(); // Cancels all EvenCheckers } } } // Test any IntGenerator: public static void test(IntGenerator gp, int count) { List <CompletableFuture<Void> > checkers = IntStream.range(0, count) .mapToObj(i -> new EvenChecker(gp, i)) .map(CompletableFuture:: runAsync)//自动创建 CF 对象 .collect(Collectors.toList()); checkers.forEach(CompletableFuture:: join);