去年双十一大促,我在某电商平台负责订单系统优化时遇到了一个棘手问题:用户支付成功后,订单状态更新成功但库存扣减失败,导致超卖风险。这就是典型的分布式事务难题——如何让不同服务的数据变更保持原子性?经过多方案对比,我们最终选择用RocketMQ的事务消息机制来破局。
一、事务消息不是银弹,但能解决90%的问题
传统二阶段提交(2PC)就像强迫症患者做选择——所有参与者必须同时举手同意,否则就全部重来。而RocketMQ的事务消息机制更像是个聪明的中间人,它把事务拆分成两个阶段:
1、 预提交阶段 :发送半消息(Half Message)到Broker,此时消息对消费者不可见
2、 最终提交阶段 :本地事务执行成功后,正式提交消息
这种设计巧妙地利用了MQ的解耦特性。有次我们遇到网络闪断,本地事务执行成功但提交消息失败,这时事务回查机制就派上了大用场——就像有个贴心的助手会定期检查:"嘿,上次那个订单到底成没成?"
二、手把手实现事务消息
来看一个真实的生产代码片段(已脱敏):
// 订单服务事务生产者
public class OrderTransactionProducer {
private TransactionMQProducer producer;
// 初始化时设置事务监听器
public void init() throws MQClientException {
producer = new TransactionMQProducer("order_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;
// 1. 创建本地订单(数据库操作)
orderService.create(order);
// 2. 扣减库存(RPC调用)
inventoryService.deduct(order.getSkuId(), order.getQuantity());
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 记录事务日志用于人工核对
transactionLogService.saveFailed(order.getOrderNo());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务回查(网络抖动时的救命稻草)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderNo = msg.getKeys();
return orderService.exists(orderNo) ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
}
}
避坑指南:这里有个隐藏的坑——executeLocalTransaction方法里的arg参数是通过sendMessageInTransaction方法传入的,但Broker在回查时不会携带这个参数。所以务必在消息体中包含事务ID等关键信息!
三、当消息系统遇上网络分区
某次机房网络故障让我深刻理解了CAP理论。当时主备机房之间网络断开,形成网络分区(Network Partitioning),导致出现诡异的现象:
1、 主机房事务消息显示COMMIT
2、 备机房因为无法同步消息,持续ROLLBACK
我们的解决方案是:
1、 在消息体中增加机房标记
2、 事务回查时优先查询本机房数据库
3、 网络恢复后执行数据对账补偿
// 增强版事务回查逻辑
public LocalTransactionState enhancedCheck(MessageExt msg) {
String orderNo = msg.getKeys();
// 优先查询本地机房数据库
if(localOrderService.exists(orderNo)) {
return COMMIT_MESSAGE;
}
// 本地不存在则查询全局唯一存储
return globalTxService.check(orderNo) ?
COMMIT_MESSAGE : ROLLBACK_MESSAGE;
}
四、性能优化三板斧
经过压测我们发现三个性能瓶颈:
1、 事务日志写入 :同步写日志改为异步批量写入
2、 线程池配置 :调整TransactionListener的线程池大小(建议CPU核数*2)
3、 消息序列化 :将JSON改为Protobuf,消息体积缩小40%
// 优化后的线程池配置
ExecutorService executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
100,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue(5000),
new ThreadFactory() {
// 给线程起个有意义的名字(出问题时你一定会感谢这个决定)
public Thread newThread(Runnable r) {
return new Thread(r, "tx-check-" + counter.getAndIncrement());
}
});
producer.setExecutorService(executor);
五、思考题:如果遇到这些情况怎么办?
1、 事务消息COMMIT成功了,但消费者处理失败?
2、 业务高峰期事务回查请求暴增,如何限流?
3、 如何设计补偿机制应对极端情况?
(提示:第一个问题可以结合消息重试机制+死信队列处理,第二个问题需要实现滑动时间窗口限流,第三个问题建议采用TCC模式兜底)
终极挑战:尝试在事务消息中实现Saga模式的长事务,你会怎么设计?欢迎在评论区分享你的方案,我们一起探讨分布式事务的更多可能性。