Spring WebFlux 开发指南
本小节以项目阶段模块为例,详细说明如何在本项目中使用 Spring WebFlux 架构进行功能模块开发。最终效果如下图所示: 

TIP
- Spring WebFlux架构虽然并发性能出色, 但有一定的开发难度, 建议使用前进行系统性学习掌握
- 由于矩阵星云融合了Spring MVC架构, 因此面向Spring Webfulx架构开发必须在com.matrix.app.flux包下编码
🗺️ 开发导航
第一步:了解项目结构
src/main/java/com/matrix/app/flux/project/stage/
├── controller/ # 控制器层 - 处理HTTP请求
├── service/ # 服务层 - 业务逻辑
│ └── impl/ # 服务实现类
├── repository/ # 数据访问层 - 数据库操作
└── data/ # 数据模型
├── StagePo.java # 持久化对象
├── StageDto.java # 数据传输对象
└── StageVO.java # 视图对象第二步:数据模型设计
2.1 持久化对象 (Po)
java
@Table("project_stage")
public class StagePo implements Serializable {
@Id
private Long id;
private String name;
private String memo;
private String status;
// ... getter/setter
}关键点:
- 使用
@Table注解映射数据库表 - 使用
@Id标识主键 - 实现
Serializable接口
2.2 数据传输对象 (Dto)
java
public class StageDto extends Pager<StageVO> {
private String searchKey;
// ... getter/setter
}关键点:
- 继承
Pager类获得分页能力 - 包含查询条件字段
2.3 视图对象 (VO)
java
public class StageVO extends StagePo {
// 根据需要扩展字段
}第三步:数据访问层开发
3.1 创建 Repository 接口
java
@Repository
public interface StageRepository extends R2dbcRepository<StagePo, Long> {
@Query("SELECT * FROM project_stage WHERE (:searchKey IS NULL OR name LIKE CONCAT('%', :searchKey, '%'))")
Flux<StagePo> findByConditions(@Param("searchKey") String searchKey, Pageable pageable);
@Query("SELECT COUNT(*) FROM project_stage WHERE (:searchKey IS NULL OR name LIKE CONCAT('%', :searchKey, '%'))")
Mono<Long> countByConditions(@Param("searchKey") String searchKey);
}WebFlux 关键特性:
- 继承
R2dbcRepository而不是JpaRepository - 返回类型使用
Flux<T>(多个结果) 或Mono<T>(单个结果) - 使用
@Query注解编写自定义查询
第四步:服务层开发
4.1 定义服务接口
java
public interface StageService {
Mono<StagePo> save(StagePo stage);
Mono<StagePo> deleteById(Long id);
Mono<StagePo> findById(Long id);
Mono<StageDto> findStages(StageDto dto);
}WebFlux 关键特性:
- 所有方法返回
Mono<T>或Flux<T> - 支持异步非阻塞操作
TIP
由于矩阵星云已经做好了返回值封装, 故控制层接口所有返回类型均采用Mono即可. Flux只需要应用在服务层中较为复杂的业务逻辑的实现上
4.2 实现服务类
java
@Service
public class StageServiceImpl implements StageService {
private final StageRepository repository;
public StageServiceImpl(StageRepository repository) {
this.repository = repository;
}
@Override
public Mono<StagePo> save(StagePo stage) {
return repository.save(stage);
}
@Override
public Mono<StagePo> deleteById(Long id) {
return repository.findById(id)
.flatMap(stage -> repository.deleteById(id).thenReturn(stage));
}
@Override
public Mono<StageDto> findStages(StageDto dto) {
// 处理分页查询逻辑
return repository.findByConditions(searchKey, pageable)
.map(stage -> {
// Po -> VO 转换
StageVO vo = new StageVO();
// ... 属性赋值
return vo;
})
.collectList()
.zipWith(repository.countByConditions(searchKey))
.map(tuple -> {
StageDto result = new StageDto();
result.setItems(tuple.getT1());
result.setTotal(tuple.getT2());
return result;
});
}
}WebFlux 操作符详解:
flatMap(): 将一个 Mono 转换为另一个 Monomap(): 转换数据collectList(): 将 Flux 收集为 ListzipWith(): 合并两个异步流thenReturn(): 忽略结果并返回指定值
第五步:控制器层开发
5.1 创建控制器
java
@RestController
@RequestMapping("/project/stage")
public class StageController {
private final StageService stageService;
public StageController(StageService stageService) {
this.stageService = stageService;
}
@LogCollector
@PostMapping("/save")
public Mono<Result<StagePo>> save(@RequestBody StagePo stage, ServerWebExchange exchange) {
return stageService.save(stage)
.map(saved -> Result.ok(saved)
.message(I18n.getMessage(MessageConstants.SAVE_SUCCESS)));
}
@GetMapping("/get")
public Mono<Result<StagePo>> get(@RequestParam("id") Long id) {
return stageService.findById(id)
.map(stage -> Result.ok(stage)
.message(I18n.getMessage(MessageConstants.QUERY_SUCCESS)));
}
}WebFlux 控制器特性:
- 方法返回
Mono<Result<T>> - 使用
ServerWebExchange代替HttpServletRequest - 支持响应式编程模型
第六步:全局选项支持
6.1 实现选项接口
java
@Service
public class StageServiceImpl implements StageService, IOptionService {
@Override
public Flux<Options> getOptions(Map<String, String> filter) {
return repository.findAll()
.map(p -> new Options(p.getName(), p.getId() + ""));
}
}🚀 WebFlux 最佳实践
1. 异步编程思维
java
// ❌ 错误示例 - 阻塞操作
public Mono<String> badExample() {
String result = blockingOperation(); // 阻塞调用
return Mono.just(result);
}
// ✅ 正确示例 - 非阻塞操作
public Mono<String> goodExample() {
return Mono.fromCallable(() -> blockingOperation())
.subscribeOn(Schedulers.boundedElastic());
}2. 错误处理
java
public Mono<StagePo> findByIdWithErrorHandling(Long id) {
return repository.findById(id)
.switchIfEmpty(Mono.error(new EntityNotFoundException("Stage not found")))
.onErrorResume(ex -> {
log.error("Error finding stage: ", ex);
return Mono.empty();
});
}3. 数据转换
java
public Mono<StageDto> convertToDto(Flux<StagePo> stages) {
return stages
.map(this::convertToVO)
.collectList()
.map(list -> {
StageDto dto = new StageDto();
dto.setItems(list);
return dto;
});
}📋 开发检查清单
- [ ] 数据模型使用正确的注解 (
@Table,@Id) - [ ] Repository 继承
R2dbcRepository - [ ] 服务方法返回
Mono或Flux - [ ] 控制器使用构造函数注入
- [ ] 添加适当的日志注解 (
@LogCollector) - [ ] 实现国际化消息
- [ ] 编写单元测试
- [ ] 添加必要的注释
⚠️ 常见陷阱
- 不要在响应式流中使用
.block() - 避免在控制器中编写业务逻辑
- 记住使用适当的线程调度器
- 处理空值情况 (使用
switchIfEmpty) - 正确处理异常 (使用
onErrorResume)
通过遵循本指南,你可以快速掌握 Spring WebFlux 在本项目中的应用模式,并开发出高质量的响应式模块。
