并发编程进阶:volatile、内存屏障与 CPU 缓存机制详解
知识点回顾1. 什么是CQRSCQRS是Command Query Responsibility Segregation的缩写一般称作命令查询职责分离。从字面意思理解就是将命令写入和查询读取的责任划分到不同的模型中。对比一下常用的 CRUD 模式创建-读取-更新-删除通常我们会让用户界面与负责所有四种操作的数据存储交互。而 CQRS 则将这些操作分成两种模式一种用于查询又称 R另一种用于命令又称 CUD。2. CQRS的作用是什么CQRS将系统的写操作命令和读操作查询分离到不同的模型和数据存储中从而实现读写分离提高系统的性能、可扩展性和安全性并使复杂业务逻辑写端和高效查询读端各自得到优化降低系统复杂性。它允许为写操作设计严谨的领域模型为读操作设计简单、只关注查询效率的数据模型如专用视图或报表数据库并可通过事件等机制保持最终一致性。3. CQRS 的优点独立缩放。 CQRS 使读取模型和写入模型能够独立缩放。 此方法可帮助最大程度地减少锁争用并提高负载下的系统性能。优化的数据架构。 读取操作可以使用针对查询进行优化的模式。 写入操作使用针对更新优化的模式。安全性。 通过分隔读取和写入可以确保只有适当的域实体或操作有权对数据执行写入操作。关注点分离。 分离读取和写入责任会导致更简洁、更易于维护的模型。 写入端通常处理复杂的业务逻辑。 读取端可以保持简单且专注于查询效率。更简单的查询。 在读取数据库中存储具体化视图时应用程序可以在查询时避免复杂的联接。二、关于PipelinR项目地址https://github.com/sizovs/PipelinR项目开发者在Github的介绍不多关键是最后一句话Its similar to a popular MediatR .NET library. 意思就是这个项目是参考着一个叫MediatR的.net库写的。关于MediatR我之前有两篇文章专门介绍过。PipelinR包括MediatR提供了一种CQRS的实现方式基于中介者模式实现进程内消息传递用于解耦应用中的各个组件支持请求/响应一对一有返回值和发布/订阅一对多无返回值两种消息模式。它们在内部提供管道行为 (Pipeline Behaviors)用于在消息处理前后插入自定义逻辑如日志、验证、异常处理等。需要提醒的是PipelinR并不是一个完整的CQRS框架它只是一个中介者模式的具体实现方式将调用方和处理方进行了解耦而这种模式恰好可以用来在一个单体应用或者是微服务的服务内部中实现简单的CQRS。三、依赖安装和配置1. Maven安装net.sizovspipelinr0.112. Gradle安装dependencies {compile net.sizovs:pipelinr:0.11}在Spring项目中配置PipelinRConfigurationpublic class PipelinrConfiguration {BeanPipeline pipeline(ObjectProvider commandHandlers, ObjectProvider notificationHandlers, ObjectProvider middlewares) {return new Pipelinr().with(commandHandlers::stream).with(notificationHandlers::stream).with(middlewares::orderedStream);}}四、核心组件Pipeline/PipelinrPipeline是消息和处理器之间的中介者调用方向Pipeline发送消息Pipeline收到消息后通过注册到Pipeline的中间件进行层层传递并最终抵达匹配的消息处理器进行处理。Pipelinr是Pipeline的默认实现。Command用于约定请求/响应模式的消息类型泛型参数R是返回值的类型如果不需要返回值可以将R指定为Voidy。Notification用于约定发布/订阅模式的消息类型没有返回值消息可以有多个处理器。Middleware管道中间件Command和Notification都定义了各自的中间件接口。Pipeline接收到的消息在到达最终的处理器之前会经过所有注册到Pipeline的中间。可以使用Middleware实现诸如日志记录、数据验证、开启事务等一系列操作。五、请求/响应模式实现请求/响应模式需要用到Command接口。1. 定义CommandCommand代表一个请求需要实现net.sizovs.pipelinr.Command接口。泛型参数指定返回值类型。// 定义一个创建用户的命令public class CreateUserCommand implements Command {private String username;private String email;public CreateUserCommand(String username, String email) {this.username username;this.email email;}public String getUsername() {return username;}public String getEmail() {return email;}}// 返回值类型public class UserResponse {private Long userId;private String username;private String email;public UserResponse(Long userId, String username, String email) {this.userId userId;this.username username;this.email email;}// getters}2. 定义Command Handler创建该Command对应的处理器实现net.sizovs.pipelinr.Command.Handler接口。Componentpublic class CreateUserCommandHandler implements Command.Handler {Autowiredprivate UserRepository userRepository;Overridepublic UserResponse handle(CreateUserCommand command) {// 业务逻辑处理User user new User();user.setUsername(command.getUsername());user.setEmail(command.getEmail());User savedUser userRepository.save(user);return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());}}3. 在业务代码中使用通过注入Pipeline实例发送Command并获取响应。Servicepublic class UserService {Autowiredprivate Pipeline pipeline;public UserResponse createUser(String username, String email) {CreateUserCommand command new CreateUserCommand(username, email);UserResponse response pipeline.send(command);return response;}}4. 添加Command中间件中间件可以在Command处理前后执行一些操作如验证、日志、事务管理等。Componentpublic class LoggingMiddleware implements Command.Middleware {private static final Logger logger LoggerFactory.getLogger(LoggingMiddleware.class);Overridepublic R invoke(C command, Chain chain) {logger.info(Executing command: {}, command.getClass().getSimpleName());try {R result chain.proceed(command);logger.info(Command executed successfully);return result;} catch (Exception e) {logger.error(Command execution failed, e);throw e;}}}Componentpublic class ValidationMiddleware implements Command.Middleware {Autowiredprivate Validator validator;Overridepublic R invoke(C command, Chain chain) {Set violations validator.validate(command);if (!violations.isEmpty()) {throw new ConstraintViolationException(Validation failed, violations);}return chain.proceed(command);}}ComponentOrder(1) // 指定中间件执行顺序public class TransactionMiddleware implements Command.Middleware {Autowiredprivate PlatformTransactionManager transactionManager;Overridepublic R invoke(C command, Chain chain) {TransactionStatus status transactionManager.getTransaction(new DefaultTransactionDefinition());try {R result chain.proceed(command);transactionManager.commit(status);return result;} catch (Exception e) {transactionManager.rollback(status);throw e;}}}六、发布/订阅模式实现发布/订阅模式使用Notification接口用于一对多的消息分发没有返回值。1. 定义NotificationNotification代表一个事件通知需要实现net.sizovs.pipelinr.Notification接口。// 定义一个用户创建成功的事件通知public class UserCreatedNotification implements Notification {private Long userId;private String username;private String email;private LocalDateTime createdTime;public UserCreatedNotification(Long userId, String username, String email) {this.userId userId;this.username username;this.email email;this.createdTime LocalDateTime.now();}// getters}2. 定义Notification HandlerNotification可以有多个处理器每个处理器实现net.sizovs.pipelinr.Notification.Handler接口。Componentpublic class SendWelcomeEmailHandler implements Notification.Handler {private static final Logger logger LoggerFactory.getLogger(SendWelcomeEmailHandler.class);Autowiredprivate EmailService emailService;Overridepublic void handle(UserCreatedNotification notification) {logger.info(Sending welcome email to user: {}, notification.getUsername());emailService.sendWelcomeEmail(notification.getEmail(), notification.getUsername());}}Componentpublic class LogUserCreationHandler implements Notification.Handler {private static final Logger logger LoggerFactory.getLogger(LogUserCreationHandler.class);Autowiredprivate UserAuditLogRepository auditLogRepository;Overridepublic void handle(UserCreatedNotification notification) {logger.info(Logging user creation: {}, notification.getUsername());UserAuditLog auditLog new UserAuditLog();auditLog.setUserId(notification.getUserId());auditLog.setOperation(CREATE);auditLog.setTimestamp(notification.getCreatedTime());auditLogRepository.save(auditLog);}}Componentpublic class UpdateUserStatisticsHandler implements Notification.Handler {private static final Logger logger LoggerFactory.getLogger(UpdateUserStatisticsHandler.class);Autowiredprivate UserStatisticsRepository statisticsRepository;Overridepublic void handle(UserCreatedNotification notification) {logger.info(Updating statistics for new user: {}, notification.getUsername());UserStatistics stats statisticsRepository.findOrCreate();stats.incrementTotalUsers();statisticsRepository.save(stats);}}3. 发送Notification在Command处理完成后可以发送Notification通知所有相关的处理器。Componentpublic class CreateUserCommandHandler implements Command.Handler {Autowiredprivate UserRepository userRepository;Autowiredprivate Pipeline pipeline;Overridepublic UserResponse handle(CreateUserCommand command) {// 业务逻辑处理User user new User();user.setUsername(command.getUsername());user.setEmail(command.getEmail());User savedUser userRepository.save(user);// 发送事件通知UserCreatedNotification notification new UserCreatedNotification(savedUser.getId(),savedUser.getUsername(),savedUser.getEmail());pipeline.send(notification);return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());}}4. 添加Notification中间件类似CommandNotification也支持中间件。Componentpublic class NotificationLoggingMiddleware implements Notification.Middleware {private static final Logger logger LoggerFactory.getLogger(NotificationLoggingMiddleware.class);Overridepublic void invoke(N notification, Chain chain) {logger.info(Publishing notification: {}, notification.getClass().getSimpleName());try {chain.proceed(notification);logger.info(Notification published successfully);} catch (Exception e) {logger.error(Notification publishing failed, e);throw e;}}}Componentpublic class NotificationErrorHandlingMiddleware implements Notification.Middleware {private static final Logger logger LoggerFactory.getLogger(NotificationErrorHandlingMiddleware.class);Overridepublic void invoke(N notification, Chain chain) {try {chain.proceed(notification);} catch (Exception e) {logger.error(Error handling notification: {}, notification.getClass().getSimpleName(), e);// 可以选择吞掉异常或重新抛出取决于业务需求// throw e;}}}七、总结核心收获通过本文的介绍我们了解了如何在Java应用中使用PipelinR框架实现CQRS模式。核心要点总结如下1. CQRS的价值读写分离通过Command处理写操作Notification处理事件响应实现职责的明确划分独立优化读端和写端可以独立优化不同的数据模型适应不同的场景需求系统解耦中介者模式解耦了调用方和处理方提高了系统的可维护性和可扩展性2. PipelinR的核心特性轻量级实现相比完整的CQRS框架PipelinR更轻便学习成本低灵活的管道机制通过中间件可以方便地植入横切关注点如日志、验证、事务等支持两种消息模式Command用于请求/响应Notification用于发布/订阅3. 最佳实践建议合理使用中间件通过Order注解控制中间件执行顺序但要避免中间件层级过多导致性能问题异常处理根据场景选择合适的异常处理策略Notification可考虑不中断其他处理器的错误隔离事件驱动设计充分利用Notification实现事件驱动架构解耦不同的业务流程代码组织按照Command、Handler、Middleware的划分方式组织代码保持结构清晰实施建议适用场景中等复杂度的业务系统需要良好的代码结构和可维护性业务逻辑相对复杂需要事件驱动的系统设计团队具备良好的DDD设计理念和架构意识注意事项学习曲线虽然PipelinR本身简单但要理解CQRS的设计理念需要一定时间适度使用CQRS不是银弹过度设计会增加系统复杂度要根据实际需求决定是否引入团队协作CQRS的有效实施对团队的整体架构意识和编码规范要求较高性能考虑虽然使用了中介者模式会引入少量额外开销但对大多数应用来说可以忽略不计滴兆仿即
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2459123.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!