一、SAGA模式基础
SAGA模式最初由Hector Garcia-Molina和Kenneth Salem在1987年发表的论文《SAGAS》中提出,用于解决长时间运行的事务(Long Running Transaction, LRT)问题。其核心思想是: SAGA将一个长事务拆分为多个本地短事务,由SAGA事务协调器协调,如果所有短事务都成功完成,那么整个事务完成;如果某个步骤失败,则通过补偿操作撤销之前的影响。
二、SAGA模式的两种实现方式
2.1 协同式SAGA
各服务通过事件进行通信,没有中央协调器,服务自己监听事件并决定是否执行操作。示例:
// 订单服务
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 保存订单
orderRepository.save(order);
// 发布订单创建事件
eventPublisher.publish(new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount()));
}
@EventListener
public void handlePaymentFailedEvent(PaymentFailedEvent event) {
// 补偿操作:取消订单
orderRepository.updateStatus(event.getOrderId(), OrderStatus.CANCELLED);
}
@EventListener
public void handleEnventoryReduceFailedEvent(InventoryReduceFailedEvent event){
// 取消订单
orderRepository.udpateStatus(event.getOrderId(), OrderStatus.CANCELLED);
}
}
// 库存服务
public class InventoryService {
@EventListener
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
try {
// 扣减库存
inventoryRepository.reduce(event.getOrderId(), event.getProductId(), event.getQuantity());
// 发布库存扣减成功事件
eventPublisher.publish(new InventoryReducedEvent(event.getOrderId()));
} catch (Exception e) {
// 发布库存扣减失败事件
eventPublisher.publish(new InventoryReduceFailedEvent(event.getOrderId()));
}
}
@EventListener
public void handlePaymentFailedEvent(PaymentFailedEvent event) {
// 补偿操作:恢复库存
inventoryRepository.restore(event.getOrderId());
}
}
// 支付服务
public class PaymentService {
@EventListener
public void handleInventoryReducedEvent(InventoryReducedEvent event) {
try {
// 处理支付
paymentRepository.process(event.getOrderId(), event.getAmount());
// 发布支付成功事件
eventPublisher.publish(new PaymentSucceededEvent(event.getOrderId()));
} catch (Exception e) {
// 发布支付失败事件
eventPublisher.publish(new PaymentFailedEvent(event.getOrderId()));
}
}
}
2.2 编排式SAGA
通过中央协调器(Orchestrator)来集中管理SAGA的执行流程,协调器负责调用参与者的服务,并在失败时调用补偿操作。
示例:
// SAGA协调器
public class OrderSagaOrchestrator {
private final OrderService orderService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final ShippingService shippingService;
@Transactional
public void createOrder(Order order) {
// 1. 创建订单
orderService.createOrder(order);
try {
// 2. 扣减库存
inventoryService.reduceInventory(order.getId(), order.getProductId(), order.getQuantity());
// 3. 处理支付
paymentService.processPayment(order.getId(), order.getAmount());
// 4. 创建物流单
shippingService.createShipping(order.getId(), order.getAddress());
} catch (Exception e) {
// 执行补偿操作
compensate(order.getId(), e);
throw e;
}
}
private void compensate(Long orderId, Exception failure) {
try {
// 根据失败点决定补偿范围
if (failure instanceof PaymentException) {
// 支付失败,需要恢复库存和取消订单
inventoryService.restoreInventory(orderId);
orderService.cancelOrder(orderId);
} else if (failure instanceof InventoryException) {
// 库存失败,只需要取消订单
orderService.cancelOrder(orderId);
}
// 其他情况的补偿逻辑...
} catch (Exception e) {
// 补偿操作也失败了,需要记录并人工干预
log.error("Compensation failed for order {}", orderId, e);
}
}
}
三、SEATA框架中使用Saga模式
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
目前SEATA提供的Saga模式是基于状态机引擎来实现的,机制是:
1、 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
2、 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
3、 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚
注意: 异常发生时是否进行补偿也可由用户自定义决定
可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
示例状态图:
状态机引擎
- 图中的状态图是先执行stateA, 再执行stateB,然后执行stateC
- "状态"的执行是基于事件驱动的模型,stateA执行完成后,会产生路由消息放入EventQueue,事件消费端从EventQueue取出消息,执行stateB
- 在整个状态机启动时会调用Seata Server开启分布式事务,并生产xid, 然后记录"状态机实例"启动事件到本地数据库
- 当执行到一个"状态"时会调用Seata Server注册分支事务,并生产branchId, 然后记录"状态实例"开始执行事件到本地数据库
- 当一个"状态"执行完成后会记录"状态实例"执行结束事件到本地数据库, 然后调用Seata Server上报分支事务的状态
- 当整个状态机执行完成, 会记录"状态机实例"执行完成事件到本地数据库, 然后调用Seata Server提交或回滚分布式事务
更多内容,请参考SEATA官网:
https://seata.apache.org/zh-cn/docs/user/mode/saga