专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

分布式事务解决方案:2PC、TCC与Seata AT模式实战解析

一、为什么需要分布式事务

随着微服务架构和分布式系统的普及,一个业务操作往往需要调用多个服务,修改多个数据源的数据。例如:

  • 电商系统中的下单操作:需要扣减库存、创建订单、支付等多个操作
  • 银行转账操作:需要从一个账户扣款,另一个账户加款

这些操作需要作为一个整体要么全部成功,要么全部失败,这就需要分布式事务来保证。

二、两阶段提交(2PC) 原理

两阶段提交(Two-Phase Commit,简称2PC)是分布式系统中实现分布式事务的经典算法。它将事务的提交过程分为两个阶段:

1、 准备阶段(Prepare Phase):协调者询问所有参与者是否可以提交
2、 提交/回滚阶段(Commit/Rollback Phase):根据准备阶段的结果决定提交或回滚

角色划分:

  • 协调者(Coordinator):事务的发起者,负责协调所有参与者
  • 参与者(Participant/Cohort):事务的实际执行者,负责本地事务的执行

**两阶段提交流程:**1)准备阶段

1、 协调者向所有参与者发送prepare请求
2、 参与者执行本地事务但不提交,记录undo/redo日志
3、 参与者向协调者反馈响应:
4、 成功:返回"同意"

失败:返回"中止"

2)提交/回滚阶段

情况1:所有参与者都返回"同意"

1、 协调者向所有参与者发送commit请求
2、 参与者完成本地事务提交
3、 参与者向协调者发送ack响应
4、 协调者收到所有ack后完成事务

情况2:任一参与者返回"中止"或超时

1、 协调者向所有参与者发送rollback请求
2、 参与者利用undo日志回滚本地事务
3、 参与者向协调者发送ack响应
4、 协调者收到所有ack后中断事务

三、基本实现示例

首先定义协调者和参与者的接口:

public interface Coordinator {
    void startTransaction(List<Participant> participants);
    boolean prepare();
    void commit();
    void rollback();
}
public interface Participant {
    boolean prepare();
    void commit();
    void rollback();
}

参与都实现代码:

public class DatabaseParticipant implements Participant {
    private Connection connection;
    private String transactionId;
    public DatabaseParticipant(Connection connection, String transactionId) {
        this.connection = connection;
        this.transactionId = transactionId;
    }
    @Override
    public boolean prepare() {
        try {
            // 设置不自动提交
            connection.setAutoCommit(false);
            // 执行SQL但不提交
            // 这里应该有实际的业务SQL,如:
            // PreparedStatement ps = connection.prepareStatement("UPDATE account SET balance = balance - 100 WHERE user_id = 1");
            // ps.executeUpdate();
            // 记录redo/undo日志
            logRedoUndo();
            return true;
        } catch (SQLException e) {
            return false;
        }
    }
    @Override
    public void commit() {
        try {
            connection.commit();
            cleanRedoUndo();
        } catch (SQLException e) {
            // 处理异常
        }
    }
    @Override
    public void rollback() {
        try {
            connection.rollback();
            cleanRedoUndo();
        } catch (SQLException e) {
            // 处理异常
        }
    }
    private void logRedoUndo() {
        // 实现记录redo/undo日志的逻辑
    }
    private void cleanRedoUndo() {
        // 清理日志
    }
}

协调者实现代码:

public class TwoPhaseCommitCoordinator implements Coordinator {
    private List<Participant> participants;
    private String transactionId;
    public TwoPhaseCommitCoordinator(String transactionId) {
        this.transactionId = transactionId;
    }
    @Override
    public void startTransaction(List<Participant> participants) {
        this.participants = participants;
    }
    @Override
    public boolean prepare() {
        for (Participant participant : participants) {
            if (!participant.prepare()) {
                return false;
            }
        }
        return true;
    }
    @Override
    public void commit() {
        if (prepare()) {
            for (Participant participant : participants) {
                participant.commit();
            }
        } else {
            rollback();
        }
    }
    @Override
    public void rollback() {
        for (Participant participant : participants) {
            try {
                participant.rollback();
            } catch (Exception e) {
                // 记录日志,继续回滚其他参与者
            }
        }
    }
}

使用示例:

public class TwoPhaseCommitExample {
    public static void main(String[] args) {
        // 模拟两个数据库参与者
        Connection conn1 = getConnection(); // 获取第一个数据库连接
        Connection conn2 = getConnection(); // 获取第二个数据库连接
        Participant participant1 = new DatabaseParticipant(conn1, "tx123");
        Participant participant2 = new DatabaseParticipant(conn2, "tx123");
        Coordinator coordinator = new TwoPhaseCommitCoordinator("tx123");
        coordinator.startTransaction(Arrays.asList(participant1, participant2));
        try {
            coordinator.commit();
            System.out.println("事务提交成功");
        } catch (Exception e) {
            coordinator.rollback();
            System.out.println("事务回滚");
        }
    }
    private static Connection getConnection() {
        // 实际应用中应该从数据源获取连接
        return null;
    }
}

四、Seata AT模式

Seata(Simple Extensible Autonomous Transaction Architecture)是一款开源的分布式事务解决方案,提供了AT、TCC、SAGA和XA四种事务模式。其中,AT(Auto Transaction)模式是基于两阶段提交协议改进而来,通过数据源代理和全局锁机制实现了对业务代码几乎零侵入的分布式事务支持。

**包含组件:**Transaction Coordinator (TC,事务协调器)

  • 独立部署的服务,维护全局事务和分支事务的状态
  • 负责协调全局事务的提交或回滚
  • 管理全局锁的获取与释放

Transaction Manager (TM,事务管理器)

  • 嵌入在应用中,负责定义全局事务边界
  • 通过@GlobalTransactional注解标记分布式事务方法
  • 向TC发起全局事务的开始、提交或回滚指令

Resource Manager (RM,资源管理器)

  • 管理分支事务上的资源
  • 向TC注册分支事务并报告状态
  • 驱动分支事务的提交或回滚
  • 负责生成和操作undo log

**AT模式的整体流程:**1)业务执行与本地提交

1、 解析SQL:拦截业务SQL,解析SQL类型(INSERT/UPDATE/DELETE)、表、条件等信息
2、 查询前镜像:根据SQL条件查询修改前的数据快照(before image)
3、 执行业务SQL:执行用户的实际业务SQL
4、 查询后镜像:根据主键查询修改后的数据快照(after image)
5、 插入回滚日志:将前后镜像和业务SQL信息组成undo log记录,插入到undo_log
6、 注册分支事务:向TC注册分支事务并获取全局锁
7、 提交本地事务:业务SQL和undo log在同一个本地事务中提交
8、 上报执行结果:将本地事务执行结果上报给TC

2)全局提交或回滚

全局提交

1、 TC异步通知各分支事务提交
2、 RM异步删除对应的undo log记录
3、 释放全局锁

全局回滚

1、 TC通知各分支事务回滚
2、 RM根据undo log中的before image生成补偿SQL并执行
3、 校验数据一致性(对比after image与当前数据)
4、 删除undo log记录
5、 释放全局锁

Seata的详细信息,请查看官网:

https://seata.apache.org/zh-cn/docs/dev/mode/at-mode

五、两阶段提交的问题1)

协调者单点故障如果在第二阶段协调者宕机,部分参与者收到commit而部分没收到,系统将处于不一致状态。解决方法:记录事务日志,协调者恢复后能继续处理。2)网络分区网络分区可能导致部分参与者无法收到协调者的指令。为了解决2PC的网络阻塞问题,引入了3PC:

1、 CanCommit阶段:询问参与者是否可以提交
2、 PreCommit阶段:预提交,执行事务但不提交
3、 DoCommit阶段:实际提交

3PC通过引入超时机制减少了阻塞,但增加了复杂度。

未经允许不得转载:搜云库 » 分布式事务解决方案:2PC、TCC与Seata AT模式实战解析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们