我们先来设想一个场景。
有一个 http 的接口 A,该接口内部实际上是由另外三个接口 B、C、D 返回结果的组合,这三个接口不存在相互依赖。我们一般的写法就是 B、C、D 同步顺序执行,依次拿到结果后组装在一起。那么假如这三个接口分别耗时 2 秒,那么 A 接口就要耗时 6 秒。如果可以让 B、C、D 同时执行的话,那么 A 接口理论上只要耗时 2 秒。
当然实际情况肯定复杂的多,如果一个接口内部存在不相互依赖的耗时调用的话,那么我们可以做这样的合并,响应时间上的减少还是非常明显的。整个接口的响应时间取决于最长的那个内部接口。
那么我们来看看在 Java 中有哪些方法可以达到这样的目的。认真思考下你会发现,如果要并行处理的话,在 Java 中只能用多线程来做。实际情况中每个线程处理完的时间肯定不一样,那么如何让线程先处理完的停下来等最后那个处理完的呢。如果经常用多线程的小伙伴肯定能想到 CountDownLatch 工具类。当然也有直接简单暴力的方法,在空循环里轮询每个线程是否执行完,但是这样做肯定不优雅。
那下面就直接上代码了: 假设有个学生服务提供查询学生名字,年龄和家庭信息,每个服务之间没有相互依赖。 我们就简单模拟下来获取学生信息的一个接口。
常规方法
@RequestMapping("/getStudentInfo")
public Object getStudentInfo() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
resultMap.put("studentName", studentService.getStudentName());
resultMap.put("studentAge", studentService.getSutdentAge());
resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
顺序同步执行,耗时 6 秒。
1. Future
@RequestMapping("/getStudentInfoWithFuture")
public Object testWhitCallable() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
CountDownLatch countDownLatch = new CountDownLatch(3);
Future futureStudentName = es.submit(() -> {
Object studentName = studentService.getStudentName();
countDownLatch.countDown();
return studentName;
});
Future futureStudentAge = es.submit(() -> {
Object studentAge = studentService.getSutdentAge();
countDownLatch.countDown();
return studentAge;
});
Future futureStudentFamilyInfo = es.submit(() -> {
Object studentFamilyInfo = studentService.getSutdentFamilyInfo();
countDownLatch.countDown();
return studentFamilyInfo;
});
//同步等待所有线程执行完之后再继续
countDownLatch.await();
resultMap.put("studentName", futureStudentName.get());
resultMap.put("studentAge", futureStudentAge.get());
resultMap.put("studentFamilyInfo", futureStudentFamilyInfo.get());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
2.RxJava
@RequestMapping("/getStudentInfoWithRxJava")
public Object testWithRxJava() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
Observable studentNameObservable = Observable.create(observableEmitter -> {
resultMap.put("studentName", studentService.getStudentName());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable studentAgeObservable = Observable.create(observableEmitter -> {
resultMap.put("studentAge", studentService.getSutdentAge());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable familyInfoObservable = Observable.create(observableEmitter -> {
resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
//创建一个下游 Observer
Observer<Object> observer = new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
//因为后面用了 merge 操作符,所以会合并后发射,那么只要 countdown 一次就行了。
countDownLatch.countDown();
}
};
//建立连接,
Observable.merge(studentNameObservable, studentAgeObservable, familyInfoObservable).subscribe(observer);
//等待异步线程完成
countDownLatch.await();
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
对于 RxJava 我不熟,我也是临时学习的,不知道这种写法是不是最佳的。
3.CompletableFutures
@RequestMapping("/getStudentInfoWithCompletableFuture")
public Object getStudentInfoWithCompletableFuture() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
CompletableFuture<Object> completableFutureStudentName = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getStudentName();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture<Object> completableFutureSutdentAge = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getSutdentAge();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture<Object> completableFutureFamilyInfo = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getSutdentFamilyInfo();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture.allOf(completableFutureStudentName, completableFutureSutdentAge, completableFutureFamilyInfo).join();
resultMap.put("studentName", completableFutureStudentName.get());
resultMap.put("studentAge", completableFutureSutdentAge.get());
resultMap.put("studentFamilyInfo", completableFutureFamilyInfo.get());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
自带最后的同步等待,不需要 CountDownLatch。CompletableFuture 还有很多其他好用的方法。
有兴趣的可以自己来实验下。 github 项目地址 reactive-programming-sample。
参考资料
Processing streaming data with Spring WebFlux 这文章虽然是说 webflux 的,但是列出了很多其他方案。
RxJava Essentials 中文翻译版
这是我在 v2ex 上提问的一个帖子,感谢网友提供的帮助。
下面是根据网友回复整理出一些可供参考的方案:
- Java 5 提供的 Futures
- Java 8 提供的 CompletableFutures
- 第三方库 RxJava
- Spring 5 提供的 Reactive Streams, Reactor 实现的。
- Spring 5 中的 Webflux。
There are no comments on this post.