本文系统讲解了本地消息表在解决分布式事务中的原理与实战方案,包含消息表设计、定时任务补偿、幂等机制、顺序处理优化以及与MQ集成的混合方案,适用于SpringBoot高并发业务中实现可靠消息最终一致性,降低系统耦合同时提升稳定性。
一、本地消息表原理
1.1 基本思想:
本地消息表的核心思想是:
1、 将分布式事务拆分成本地事务进行处理
2、 通过消息表记录事务状态
3、 定时任务补偿机制保证最终一致性
1.2 工作流程:
事务发起方:
- 在本地事务中完成业务数据操作
- 同时向消息表插入一条待发送的消息
定时任务:
- 定期扫描消息表中状态为"待发送"的消息
- 将消息发送给消息消费者
- 根据消费者响应更新消息状
消息消费者:
- 接收并处理消息
- 完成自身业务逻辑
- 返回处理结果
二、实现方案示例2.1 数据库表设计订单表
CREATE TABLE `order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) NOT NULL,
`product_id` bigint(20) NOT NULL,
`amount` decimal(10,2) NOT NULL,
`status` tinyint(4) NOT NULL COMMENT '0-待支付,1-已支付,2-已取消',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
消息表
CREATE TABLE `transaction_message` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`message_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`topic` varchar(64) NOT NULL COMMENT '消息主题',
`content` text NOT NULL COMMENT '消息内容',
`status` tinyint(4) NOT NULL COMMENT '0-待发送,1-已发送,2-发送失败,3-已消费',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_retry_time` datetime DEFAULT NULL COMMENT '下次重试时间',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`),
KEY `idx_status_next_retry_time` (`status`,`next_retry_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
定时任务:
@Component
public class MessageTask {
private static final Logger logger = LoggerFactory.getLogger(MessageTask.class);
@Autowired
private MessageService messageService;
@Autowired
private MessageSender messageSender;
// 每30秒执行一次
@Scheduled(fixedRate = 30000)
public void processPendingMessages() {
// 每次处理100条
List<TransactionMessage> messages = messageService.queryPendingMessages(100);
for (TransactionMessage message : messages) {
try {
// 发送消息
boolean result = messageSender.send(message);
if (result) {
// 发送成功,更新状态为已发送
messageService.updateMessageStatus(message.getMessageId(),
TransactionMessage.STATUS_SENT);
} else {
// 发送失败,更新重试次数
handleFailedMessage(message);
}
} catch (Exception e) {
logger.error("发送消息失败, messageId: {}", message.getMessageId(), e);
handleFailedMessage(message);
}
}
}
private void handleFailedMessage(TransactionMessage message) {
int maxRetryCount = 5;
if (message.getRetryCount() >= maxRetryCount) {
// 超过最大重试次数,标记为失败
messageService.updateMessageStatus(message.getMessageId(),
TransactionMessage.STATUS_FAILED);
} else {
// 计算下次重试时间(指数退避)
long delay = (long) Math.pow(2, message.getRetryCount()) * 1000;
Date nextRetryTime = new Date(System.currentTimeMillis() + delay);
messageService.updateMessageWithRetry(
message.getMessageId(),
TransactionMessage.STATUS_PENDING,
message.getRetryCount() + 1,
nextRetryTime
);
}
}
}
消息发送者:
@Component
public class MessageSender {
@Autowired
private RestTemplate restTemplate;
public boolean send(TransactionMessage message) {
try {
String url = determineConsumerUrl(message.getTopic());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> request = new HttpEntity<>(message.getContent(), headers);
ResponseEntity<String> response = restTemplate.postForEntity(
url, request, String.class);
return response.getStatusCode().is2xxSuccessful();
} catch (Exception e) {
return false;
}
}
private String determineConsumerUrl(String topic) {
// 根据topic确定消费者URL
// 实际项目中可以从配置中心或服务注册中心获取
return "http://inventory-service/api/message/consume";
}
}
消息接收者:
@RestController
@RequestMapping("/api/message")
public class MessageConsumer {
@Autowired
private InventoryService inventoryService;
@PostMapping("/consume")
public ResponseEntity<String> consume(@RequestBody String messageContent) {
try {
// 解析消息内容
OrderMessage orderMessage = parseMessage(messageContent);
// 处理库存扣减
inventoryService.deductInventory(
orderMessage.getProductId(),
orderMessage.getQuantity());
return ResponseEntity.ok("success");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("consume failed: " + e.getMessage());
}
}
private OrderMessage parseMessage(String content) {
// 使用JSON解析消息内容
return JSON.parseObject(content, OrderMessage.class);
}
}
创建订单并保存消息:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private MessageService messageService;
@Transactional
public void createOrder(OrderDTO orderDTO) {
// 1. 创建订单
Order order = convertToOrder(orderDTO);
orderMapper.insert(order);
// 2. 准备消息内容
OrderMessage orderMessage = new OrderMessage();
orderMessage.setOrderId(order.getId());
orderMessage.setProductId(order.getProductId());
orderMessage.setQuantity(order.getQuantity());
// 3. 创建事务消息
TransactionMessage message = new TransactionMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setTopic("order_created");
message.setContent(JSON.toJSONString(orderMessage));
message.setStatus(TransactionMessage.STATUS_PENDING);
// 4. 保存消息(与订单在同一个事务中)
messageService.saveMessage(message);
}
// 省略其他方法
}
三、消息表方案存在的问题
3.1 重复消费问题由于网络问题,消费者可能收到重复消息解决方案:
1、 幂等设计:消费者接口需要支持幂等操作
2、 消息状态检查:消费者在处理前检查消息是否已处理过
3.2 消息顺序问题
某些业务场景下需要保证消息的顺序性
解决方案:
1、 单线程消费:同一业务ID的消息由同一个线程处理
2、 版本号机制:消息携带版本号,消费者按版本号顺序处理
3.3 定时任务性能优化消息表数据量大时,定时任务扫描性能差优化方案:
1、 分片处理:按ID范围或哈希分片处理
2、 索引优化:确保(status, next_retry_time)有索引
3、 批量处理:每次处理一批消息
四、引入消息队列
对于高并发场景,可以将本地消息表与MQ结合:
1、 定时任务将消息从数据库发送到MQ
2、 消费者从MQ消费消息
3、 兼具本地消息表的可靠性和MQ的高性能
五、总结
本地消息表是一种简单有效的分布式事务解决方案,特别适合以下场景:
- 业务对一致性要求不是实时强一致,可以接受短暂延迟
- 希望避免引入复杂的分布式事务框架
- 系统已经基于数据库构建,希望最小化改造成本