相关文章:
- 用最简单的方式理解同步和异步、阻塞与非阻塞
- CompletableFuture 实战
背景
昨晚和一个朋友讨论了他在开发过程中遇到的一个场景设计问题。这个场景可以简化为:服务接收到一个需要处理的任务请求,然后立即返回。这个任务需要经过四个处理器的处理,每个处理器的处理都依赖于前一个处理器的处理结果。
这个场景与我最近处理 GitLab WebHook 的工作流程非常相似。例如,我从 GitLab 接收到一个事件后,需要对数据进行清洗、解析、调用 AI 大模型和发布评论等操作,每个操作都依赖于前一个操作的结果。
对于这种场景的设计,我目前采用的是基于链式设计的方法。我定义了一个抽象的处理器类和链式工厂:
/**
* @author dongguabai
* @date 2024-01-09 13:52
*/
public abstract class MergeRequestHandler {
private MergeRequestHandler next;
public MergeRequestHandler(MergeRequestHandler next) {
this.next = next;
}
public void handle(Task task) {
if (run(task) && next != null) {
next.handle(task);
}
}
public abstract boolean run(Task task);
}
/**
* @author dongguabai
* @date 2024-01-09 15:53
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MergeRequestHandlerFactory {
public static MergeRequestHandler createHandlerChain() {
return new ActionHandler(new MergeRequestRetrievalHandler(new CommentHandler(new FinalHandler())));
}
}
在链式工厂里面指定了执行顺序。
当然,也可以参考 ZooKeeper 的实现:org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
:
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false);
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
然后,朋友提出要将其改为异步处理。实际上,异步处理也是可以很简单实现的,只需在执行 HandlerChain
时将其放入线程池中即可。但在这里,我完全可以使用 CompletableFuture
来实现,这也是我当前代码改造的主要方向。
关于CompletableFuture
的实现,我设计了两套方案。现在来看一下最终实现的方案。
抽象 Processor
:
@Log4j
abstract class Processor {
public CompletableFuture<Task> process(Task task, Executor executor) {
return CompletableFuture.supplyAsync(() -> doProcess(task), executor);
}
protected abstract Task doProcess(Task task);
}
实现类:
@Log4j
class Processor1 extends Processor {
@Override
protected Task doProcess(Task task) {
// 你的处理逻辑
log.info("Processor1 is processing");
ProcessorChain.sleep(1);
log.info("Processor1 end");
if (task.isCancelled()) {
throw new CancellationException("Task was cancelled");
}
return task;
}
}
@Log4j
class Processor2 extends Processor {
@Override
protected Task doProcess(Task task) {
// 你的处理逻辑
log.info("Processor2 is processing");
ProcessorChain.sleep(2);
log.info("Processor2 end");
if (task.isCancelled()) {
throw new CancellationException("Task was cancelled");
}
return task;
}
}
核心的处理链:
@Log4j
class ProcessorChain {
private final List<Processor> processors;
public ProcessorChain(List<Processor> processors) {
this.processors = processors;
}
public CompletableFuture<Void> process(Task task, Executor executor) {
CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
for (Processor processor : processors) {
future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
}
return future.thenAccept(result ->log.info("处理完成,结果是:" + result));
}
public static void sleep(Integer i){
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试代码:
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));
Task task = new Task("1");
CompletableFuture<Void> exceptionally = processorChain.process(task, executor).exceptionally(ex -> {
System.out.println("处理过程中出现错误:" + ex.getMessage());
return null;
});
log.info("处理完成");
}
输出:
22:38:35.276 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:38:35.276 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:38:36.285 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:38:36.285 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 is processing
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 end
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 is processing
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 end
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 is processing
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 end
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@6dfb9646
效果挺符合诉求的,但这里也有几个疑问。
ProcessorChain#process中的执行顺序问题
先看 ProcessorChain#proccess
:
public CompletableFuture<Void> process(Task task, Executor executor) {
CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
for (Processor processor : processors) {
future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
}
return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));
}
朋友原话是:“他期望的执行顺序是 process1
、process2
、process3
。但由于 thenComposeAsync
是异步执行的,那么在循环到 process2
时,它会异步执行。此时,循环可能已经到了 process3
,并开始执行 thenComposeAsync
。这样,process3
和process2
可能会并行执行,这跟期望结果不一致”。
其实这里是不会的,thenComposeAsync
可以翻译为 “然后异步组合”。
这里的 “组合” 指的是将当前的 CompletableFuture
与另一个 CompletableFuture
进行组合。“异步” 指的是这里的 Function
操作会在一个单独的线程中执行,不会阻塞当前的线程。
我这里的 Processor#process
函数返回的是 CompletableFuture
,所以这里 thenComposeAsync
的流程是:等待当前的 CompletableFuture
完成后,异步执行 Function
, Function
会返回一个新的 CompletableFuture
,然后 thenComposeAsync
会返回这个 CompletableFuture
。
这里要注意 thenApplyAsync
和 thenComposeAsync
不要混淆了。
ProcessorChain#process中的阻塞与非阻塞、同步和异步问题
上文已经说明了 ProcessorChain#process 中的执行顺序问题。接下来看一下这段代码中的阻塞与非阻塞、同步和异步问题。
为了查看方便,我这里再贴一下测试代码和 ProcessorChain#proccess
的实现。
测试代码:
public static void main(String[] args) {
Executor executor = Executors.newSingleThreadExecutor();
ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));
Task task = new Task("1");
CompletableFuture<Void> c = processorChain.process(task, executor).exceptionally(ex -> {
System.out.println("处理过程中出现错误:" + ex.getMessage());
return null;
});
log.info("处理完成");
}
ProcessorChain#proccess
:
public CompletableFuture<Void> process(Task task, Executor executor) {
CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
for (Processor processor : processors) {
future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
}
return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));
}
执行流程如下:
main
线程开始执行,并调用ProcessorChain
的process
方法。- 在
ProcessorChain
的process
方法中,main
线程创建了一个已经完成的CompletableFuture
,并开始遍历Processor
对象。 main
线程调用thenComposeAsync
方法,此时main
线程将Processor1
的process
方法提交给Executor
,并立即返回一个新的CompletableFuture
。main
线程不会等待Processor1
的process
方法完成,因此main
线程是非阻塞的。Executor
的一个线程(称之为Thread_1
)开始执行Processor1
的process
方法。由于thenComposeAsync
的链式调用,Processor2
的process
方法会等待Processor1
的process
方法完成后才开始执行,以此类推。main
线程继续遍历Processor
对象,并对每个Processor
重复步骤 3 和 4。每次调用thenComposeAsync
方法时,main
线程都会立即返回一个新的CompletableFuture
,并将Processor
的process
方法提交给Executor
。这意味着main
线程是非阻塞的,而Executor
的线程会按照Processor
的顺序执行process
方法。- 当所有的
Processor
的process
方法都完成后,其中一个Executor
的线程会执行thenAccept
方法,处理最后一个Processor
的process
方法返回的结果。
其实这里的 CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
只是作为一个“CompletableFuture
的启动器”(这么一说是不是就更好理解了)。
响应式编程
其实上面的内容让我想起了响应式编程,引入维基百科的一个说明:
例如,在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。
其实这里的 CompletableFuture
实际上是响应式编程的一个简单例子,因为它表示一个异步计算的结果,我们可以在它上面“提前”注册回调函数(我觉得这个“提前”是精髓),这些回调函数会在 CompletableFuture
的 Function
完成时被调用。而我这里就是提前准备好了一个结果,即 CompletableFuture,然后不断的循环在其上面增加回调。
可以增加日志看一下:
public CompletableFuture<Void> process(Task task, Executor executor) {
CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
for (Processor processor : processors) {
log.info(future);
future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
log.info(future);
// future.thenComposeAsync(t -> processor.process(t, executor), executor);
}
return future.thenAccept(result ->log.info("处理完成,结果是:" + result));
}
再次执行测试代码:
22:56:42.512 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@3fa77460[Completed normally]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5fdef03a[Not completed]
22:56:42.614 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:56:42.615 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 is processing
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 end
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 is processing
22:56:48.630 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 end
22:56:48.631 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 is processing
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 end
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@58d4f75a
重新梳理执行流程:
-
最开始的启动器是
3fa77460
,这是一个已经完成的CompletableFuture
对象,用于启动异步操作链。 -
第一次循环:
3fa77460
执行thenComposeAsync
方法,返回新的CompletableFuture
对象e2d56bf
。此时,Processor1
的process
方法正在异步执行。 -
第二次循环:
e2d56bf
执行thenComposeAsync
方法,返回新的CompletableFuture
对象244038d0
。此时,Processor2
的process
方法正在等待Processor1
的process
方法完成。 -
第三次循环:
244038d0
执行thenComposeAsync
方法,返回新的CompletableFuture
对象5680a178
。此时,Processor3
的process
方法正在等待Processor2
的process
方法完成。 -
第四次循环:
5680a178
执行thenComposeAsync
方法,返回新的CompletableFuture
对象5fdef03a
。此时,Processor4
的process
方法正在等待Processor3
的process
方法完成。 -
所有循环结束后,返回
5fdef03a
。当所有的Processor
的process
方法都完成后,5fdef03a
会执行thenAccept
方法,处理最后一个Processor
的process
方法返回的结果。
这个过程中,每个 CompletableFuture
对象都会等待前一个 CompletableFuture
对象完成,然后开始执行自己的异步操作。这就形成了一个 CompletableFuture
链,每个 CompletableFuture
对象都会按照 Processor
的顺序执行 process
方法。
CompletableFuture链
当调用thenComposeAsync
方法时,会创建一个新的CompletableFuture
对象:
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
if (f == null) throw new NullPointerException();
Object r; Throwable x;
if (e == null && (r = result) != null) {
// try to return function result directly
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
return new CompletableFuture<V>(encodeThrowable(x, r));
}
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
Object s = g.result;
if (s != null)
return new CompletableFuture<V>(encodeRelay(s));
CompletableFuture<V> d = new CompletableFuture<V>();
UniRelay<V> copy = new UniRelay<V>(d, g);
g.push(copy);
copy.tryFire(SYNC);
return d;
} catch (Throwable ex) {
return new CompletableFuture<V>(encodeThrowable(ex));
}
}
CompletableFuture<V> d = new CompletableFuture<V>();
//构建 UniCompose
UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
return d;
}
构造的 UniCompose
对象持有前一个CompletableFuture
对象的引用,也就是 UniCompose
的 src
字段。所以即使循环中出现 CompletableFuture 被覆盖,也不用担心被 GC 的问题。
总结
本文以一个实际的场景设计问题为出发点,详细探讨了 CompletableFuture
的链式执行机制,对执行流程和线程切换进行了深入的分析,并对响应式编程的概念和应用进行了简单的讨论。
References
- https://zh.wikipedia.org/zh-cn/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B
欢迎关注公众号: