RocketMQ
基于MQ的分布式事务方案本质上是对本地消息表的封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库中
一.流程步骤
- 服务主动方发送半消息给Broker,Broker返回ACK
- 服务主动方收到Broker返回的ACK后提交本地事务
- 根据本地事务的结果向Broker发送commit消息或者回滚消息
- Broker收到commit消息则会向消费端投递消息,如果是回滚消息则会清掉该事务消息,无事发生
过程分析:
- 步骤1失败:无事发生
- 步骤2失败:事务回滚,并向broker发送rollback消息,broker清掉消息,无事发生
- 步骤3消息发送失败或消息丢失(broker没收到消息):borker会回调checkLocalTransaction检查事务的结果(也有检测次数阈值),如果事务已经成功则Commit消息,如果失败则rollback消息,无事发生 (上图的5.6.7步骤)
- 步骤4投递消息失败或者消费消息失败:会重试,RokcetMq的重试机制,达到默认次数则进死信队列,人工处理
二、优缺点及注意事项
优点: 业务系统和消息系统解耦,无需在新建消息表以及额外的定时处理,性能比消息表高
缺点: 额外的网络请求开销,要有额外的事务检查接口
注意事项: 要保证幂等性
三、示例
1.RokcetMq事务消息状态
- TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。
- TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。
- TransactionStatus.Unknown:中间状态,表示需要MQ回查才能确定状态。
2.发送事务消息
使用TransactionMQProducer类创建生产者客户端,并指定唯一的producerGroup,可以设置自定义线程池来处理检查请求。本地事务执行后,需要根据执行结果回复MQ
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
3.实现TransactionListener接口
“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三个事务状态之一。 “checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三个事务状态之一
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//这里做事务处理,根据结果返回消息状态 见RokcetMq事务消息状态
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//这里是broker回调查询事务状态,根据事务状态返回对应的消息状态 见RokcetMq事务消息状态
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}