RocketMQ

基于MQ的分布式事务方案本质上是对本地消息表的封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库中

一.流程步骤

  1. 服务主动方发送半消息给Broker,Broker返回ACK
  2. 服务主动方收到Broker返回的ACK后提交本地事务
  3. 根据本地事务的结果向Broker发送commit消息或者回滚消息
  4. Broker收到commit消息则会向消费端投递消息,如果是回滚消息则会清掉该事务消息,无事发生

过程分析:

  1. 步骤1失败:无事发生
  2. 步骤2失败:事务回滚,并向broker发送rollback消息,broker清掉消息,无事发生
  3. 步骤3消息发送失败或消息丢失(broker没收到消息):borker会回调checkLocalTransaction检查事务的结果(也有检测次数阈值),如果事务已经成功则Commit消息,如果失败则rollback消息,无事发生 (上图的5.6.7步骤)
  4. 步骤4投递消息失败或者消费消息失败:会重试,RokcetMq的重试机制,达到默认次数则进死信队列,人工处理

二、优缺点及注意事项

优点: 业务系统和消息系统解耦,无需在新建消息表以及额外的定时处理,性能比消息表高

缺点: 额外的网络请求开销,要有额外的事务检查接口

注意事项: 要保证幂等性

三、示例

1.RokcetMq事务消息状态

  1. TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。
  2. TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。
  3. TransactionStatus.Unknown:中间状态,表示需要MQ回查才能确定状态。

2.发送事务消息

使用TransactionMQProducer类创建生产者客户端,并指定唯一的producerGroup,可以设置自定义线程池来处理检查请求。本地事务执行后,需要根据执行结果回复MQ

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

3.实现TransactionListener接口

“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三个事务状态之一。 “checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三个事务状态之一

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //这里做事务处理,根据结果返回消息状态 见RokcetMq事务消息状态
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        //这里是broker回调查询事务状态,根据事务状态返回对应的消息状态 见RokcetMq事务消息状态 
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

Last Updated: