Skip to content

Spring WebFlux 开发指南

本小节以项目阶段模块为例,详细说明如何在本项目中使用 Spring WebFlux 架构进行功能模块开发。最终效果如下图所示: stage-list.png

stage-edit.png

TIP

  1. Spring WebFlux架构虽然并发性能出色, 但有一定的开发难度, 建议使用前进行系统性学习掌握
  2. 由于矩阵星云融合了Spring MVC架构, 因此面向Spring Webfulx架构开发必须在com.matrix.app.flux包下编码

🗺️ 开发导航

第一步:了解项目结构

src/main/java/com/matrix/app/flux/project/stage/
├── controller/          # 控制器层 - 处理HTTP请求
├── service/            # 服务层 - 业务逻辑
│   └── impl/           # 服务实现类
├── repository/         # 数据访问层 - 数据库操作
└── data/              # 数据模型
    ├── StagePo.java   # 持久化对象
    ├── StageDto.java  # 数据传输对象
    └── StageVO.java   # 视图对象

第二步:数据模型设计

2.1 持久化对象 (Po)

java
@Table("project_stage")
public class StagePo implements Serializable {
    @Id
    private Long id;
    private String name;
    private String memo;
    private String status;
    // ... getter/setter
}

关键点:

  • 使用 @Table 注解映射数据库表
  • 使用 @Id 标识主键
  • 实现 Serializable 接口

2.2 数据传输对象 (Dto)

java
public class StageDto extends Pager<StageVO> {
    private String searchKey;
    // ... getter/setter
}

关键点:

  • 继承 Pager 类获得分页能力
  • 包含查询条件字段

2.3 视图对象 (VO)

java
public class StageVO extends StagePo {
    // 根据需要扩展字段
}

第三步:数据访问层开发

3.1 创建 Repository 接口

java
@Repository
public interface StageRepository extends R2dbcRepository<StagePo, Long> {
    
    @Query("SELECT * FROM project_stage WHERE (:searchKey IS NULL OR name LIKE CONCAT('%', :searchKey, '%'))")
    Flux<StagePo> findByConditions(@Param("searchKey") String searchKey, Pageable pageable);
    
    @Query("SELECT COUNT(*) FROM project_stage WHERE (:searchKey IS NULL OR name LIKE CONCAT('%', :searchKey, '%'))")
    Mono<Long> countByConditions(@Param("searchKey") String searchKey);
}

WebFlux 关键特性:

  • 继承 R2dbcRepository 而不是 JpaRepository
  • 返回类型使用 Flux<T> (多个结果) 或 Mono<T> (单个结果)
  • 使用 @Query 注解编写自定义查询

第四步:服务层开发

4.1 定义服务接口

java
public interface StageService {
    Mono<StagePo> save(StagePo stage);
    Mono<StagePo> deleteById(Long id);
    Mono<StagePo> findById(Long id);
    Mono<StageDto> findStages(StageDto dto);
}

WebFlux 关键特性:

  • 所有方法返回 Mono<T>Flux<T>
  • 支持异步非阻塞操作

TIP

由于矩阵星云已经做好了返回值封装, 故控制层接口所有返回类型均采用Mono即可. Flux只需要应用在服务层中较为复杂的业务逻辑的实现上

4.2 实现服务类

java
@Service
public class StageServiceImpl implements StageService {
    
    private final StageRepository repository;
    
    public StageServiceImpl(StageRepository repository) {
        this.repository = repository;
    }
    
    @Override
    public Mono<StagePo> save(StagePo stage) {
        return repository.save(stage);
    }
    
    @Override
    public Mono<StagePo> deleteById(Long id) {
        return repository.findById(id)
                .flatMap(stage -> repository.deleteById(id).thenReturn(stage));
    }
    
    @Override
    public Mono<StageDto> findStages(StageDto dto) {
        // 处理分页查询逻辑
        return repository.findByConditions(searchKey, pageable)
                .map(stage -> {
                    // Po -> VO 转换
                    StageVO vo = new StageVO();
                    // ... 属性赋值
                    return vo;
                })
                .collectList()
                .zipWith(repository.countByConditions(searchKey))
                .map(tuple -> {
                    StageDto result = new StageDto();
                    result.setItems(tuple.getT1());
                    result.setTotal(tuple.getT2());
                    return result;
                });
    }
}

WebFlux 操作符详解:

  • flatMap(): 将一个 Mono 转换为另一个 Mono
  • map(): 转换数据
  • collectList(): 将 Flux 收集为 List
  • zipWith(): 合并两个异步流
  • thenReturn(): 忽略结果并返回指定值

第五步:控制器层开发

5.1 创建控制器

java
@RestController
@RequestMapping("/project/stage")
public class StageController {
    
    private final StageService stageService;
    
    public StageController(StageService stageService) {
        this.stageService = stageService;
    }
    
    @LogCollector
    @PostMapping("/save")
    public Mono<Result<StagePo>> save(@RequestBody StagePo stage, ServerWebExchange exchange) {
        return stageService.save(stage)
                .map(saved -> Result.ok(saved)
                        .message(I18n.getMessage(MessageConstants.SAVE_SUCCESS)));
    }
    
    @GetMapping("/get")
    public Mono<Result<StagePo>> get(@RequestParam("id") Long id) {
        return stageService.findById(id)
                .map(stage -> Result.ok(stage)
                        .message(I18n.getMessage(MessageConstants.QUERY_SUCCESS)));
    }
}

WebFlux 控制器特性:

  • 方法返回 Mono<Result<T>>
  • 使用 ServerWebExchange 代替 HttpServletRequest
  • 支持响应式编程模型

第六步:全局选项支持

6.1 实现选项接口

java
@Service
public class StageServiceImpl implements StageService, IOptionService {
    
    @Override
    public Flux<Options> getOptions(Map<String, String> filter) {
        return repository.findAll()
                .map(p -> new Options(p.getName(), p.getId() + ""));
    }
}

🚀 WebFlux 最佳实践

1. 异步编程思维

java
// ❌ 错误示例 - 阻塞操作
public Mono<String> badExample() {
    String result = blockingOperation(); // 阻塞调用
    return Mono.just(result);
}

// ✅ 正确示例 - 非阻塞操作
public Mono<String> goodExample() {
    return Mono.fromCallable(() -> blockingOperation())
               .subscribeOn(Schedulers.boundedElastic());
}

2. 错误处理

java
public Mono<StagePo> findByIdWithErrorHandling(Long id) {
    return repository.findById(id)
            .switchIfEmpty(Mono.error(new EntityNotFoundException("Stage not found")))
            .onErrorResume(ex -> {
                log.error("Error finding stage: ", ex);
                return Mono.empty();
            });
}

3. 数据转换

java
public Mono<StageDto> convertToDto(Flux<StagePo> stages) {
    return stages
            .map(this::convertToVO)
            .collectList()
            .map(list -> {
                StageDto dto = new StageDto();
                dto.setItems(list);
                return dto;
            });
}

📋 开发检查清单

  • [ ] 数据模型使用正确的注解 (@Table, @Id)
  • [ ] Repository 继承 R2dbcRepository
  • [ ] 服务方法返回 MonoFlux
  • [ ] 控制器使用构造函数注入
  • [ ] 添加适当的日志注解 (@LogCollector)
  • [ ] 实现国际化消息
  • [ ] 编写单元测试
  • [ ] 添加必要的注释

⚠️ 常见陷阱

  1. 不要在响应式流中使用 .block()
  2. 避免在控制器中编写业务逻辑
  3. 记住使用适当的线程调度器
  4. 处理空值情况 (使用 switchIfEmpty)
  5. 正确处理异常 (使用 onErrorResume)

通过遵循本指南,你可以快速掌握 Spring WebFlux 在本项目中的应用模式,并开发出高质量的响应式模块。