RocketMQ源码分析

作者:得物技术 查看原文open in new window

概要

本文主要做了RocketMQ4.8版本的源码主流程分析。

  1. 一二三四五节介绍文件存储部分。
  2. 六七节介绍消息生产流程,包含client端和server端。
  3. 八节介绍消息消费流程。
  4. 九节介绍了传统的主从消息复制模式。
  5. 最后一节介绍了相对较新的基于raft的dleger模式消息处理流程。

一.存储设计

RocketMQ主要的存储文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件。

RocketMQ将所有主题的消息存储在同一个文件中。但由于消息中间件一般是基于主题进行订阅,这样做不方便按照消息主题检索消息。

为了方便消息消费,RocketMQ引入了ConsumeQueue消息队列文件,每个Topic包含多个ConsumeQueue,每一个ConsumeQueue有一个文件。该文件可以看成是Commitlog关于消息消费的“索引”。

此外还有IndexFile索引文件,主要就是为了加速消息的检索,根据消息的属性快速从Commitlog文件中检索消息。

  1. CommitLog:消息存储文件,所有Topic的消息都存储在CommitLog文件中。
  2. ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到ConsumeQueue,供消费者消费。
  3. IndexFile:消息索引文件,主要存储消息Key与Offset(消息在CommitLog中的偏移)的对应关系。

二. 存储文件组织

RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumeQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就是该文件第一条消息对应的全局物理偏移量。

以下为commitlog实际文件截图:

存储目录

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。

MappedFile文件映射使用方式如下:

MappedFileQueue是MappedFile的管理容器,也是对存储目录的封装,例如CommitLog文件的存储路径${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件(MappedFile)。

逻辑关系

数据结构

三.存储文件格式

3.1 Commitlog文件格式

我们来看下按照这种格式来查找指定消息的逻辑。

根据offset查找消息。

  • 根据offset定位到文件

  • 用offset与文件长度取余得到在文件内的偏移量,从该偏移量读取size长度的内容返回即可

3.2 Consumequeue文件格式

为了加速ConsumeQueue消息条目的检索,每一个Consumequeue条目不会存储消息的全量信息,其存储格式如图:

ConsumeQueue即为Commitlog文件的索引文件,其构建机制是当消息存储到Commitlog文件后,由专门的线程转发消息,从而构建ConsumeQueue。

如下图,将consumeQueue想象成一个元素大小为20字节的大数组的话,startIndex就是这个数组的索引,而真正在文件中查找的时候则需要换算成文件内的字节偏移量(直接乘20)。

以下为根据消费者消费的逻辑偏移量查找consumeQueue条目的逻辑。

四.消费队列文件的实时更新

ConsumeQueue、IndexFile都是基于CommitLog文件构建的,当生产者提交的消息存储在Commitlog文件中时,ConsumeQueue文件需要及时更新,否则消息无法及时被消费。

RocketMQ通过开启一个线程ReputMessageServcie来实时读取CommitLog文件新增内容,使用reputFromOffset来标记已经追踪到的位置。

主要流程如下:

  • 从当前记录已追踪到的位置reputFromOffset开始读取commitLog数据
  • 将读取到的buffer解析成携带关键信息的DispatchRequest(包含commitLogOffset,msgSize,tagsCode等consumeQueue的必要信息,以用来构建consumeQueue)

  • doDispatch方法最终会调用putMessagePositionInfo方法,将读取到的信息存入consumeQueue对应文件。

五.文件刷盘机制

RocketMQ的存储与读写基于内存映射机制(MappedByteBuffer),消息存储时首先将消息追加到内存,再根据配置的刷盘策略在一定时间进行刷写磁盘。

如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法。

如果是异步刷盘,在消息追加到内存后立刻返回给发送端。

RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘

参数解析:

  1. flushIntervalCommitLog:FlushRealTimeService线程任务运行间隔(默认500ms)。
  2. flushPhysicQueueLeastPages:一次刷写任务至少包含的页数,如果脏页数量不足,小于该参数配置的值,将忽略本次刷写任务,默认4页。
  3. flushPhysicQueueThoroughInterval:两次真实刷写任务最大间隔,默认10s。

注:每间隔10S会把flushPhysicQueueLeastPages强制设置为0,意为强制刷盘,无论脏页数量是否达到。

5.1 异步刷盘主流程

  1. 单线程死循环,每间隔500ms调用一次flush方法。
  2. flush方法的入参是一次刷写任务至少包含的页数, 如果脏页页数不足,则啥都不做。
  3. flush方法最终会走到 mappedByteBuffer.force()方法。

注:force方法详解

1. jdk 源码中显示force方法最终会调用msync系统调用,并传入MS_SYNC标志位。

2. MS_SYNC标志位的含义为同步等待刷盘结果。

六.消息生产 client端流程

生产者消息发送流程主要的步骤:校验消息、查找路由、消息发送(包含异常处理机制)。

6.1 消息生产总流程

  1. 获取topic路由信息
  2. 根据策略选择一个messageQueue进行消息发送,默认使用轮询策略
  3. 调用sendKernelImpl方法发送消息,如果发送失败,默认最多重试三次
  4. sendKernelImpl 会调用到mqClient,使用netty发送请求给mq broker

注:

1. 路由信息既指定topic共有多少队列,这些队列分布在哪些broker节点上,方便发送消息时选择具体的broker节点发送rpc请求。

2. 路由信息数据结构如下:

6.2 消息发送路由选择详解

  1. 获取路由信息
  2. 如果本地缓存中包含指定topic的路由信息,则直接返回;
  3. 如本地map中不包含指定topic的路由信息,则发送rpc请求到nameserver获取。
  4. 消息队列选择
  5. 查询到的路由信息(队列分布信息)结构如下图。
  6. 选择策略默认使用轮询模式,依次来做消息发送的负载均衡。

3. 消息发送,根据上步选好的队列决定出brokerAddr,发送rpc请求到这个地址。

七.消息生产 broker端流程

消息存储代码入口:org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage。

  1. 检查消息合法性
  2. 调用CommitLog类的putMessage方法
  3. 展开CommitLog类的putMessage
  4. 获取当前可以写入的Commitlog文件
  5. put之前先上独占锁
  6. 调用MappedFile#appendMessage方法
  7. finally块中解锁

4. 展开MappedFile#appendMessage(核心就是调用MappedByteBuffer.put

八.消费者消费

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。

集群模式,Topic下的同一条消息只允许被其中一个消费者消费。

广播模式,Topic下的同一条消息将被集群内的所有消费者消费一次

集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循的原则:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。

8.1 客户端消息拉取

RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。

a.从pullRequestQueue中获取一个PullRequest(消息拉取任务),如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入。

b.调用pullMessage方法处理拿到的PullRequest进行消息拉取,PullRequest包含将要消费的messageQueue信息。

c. 调用pullKernelImpl,传入需要拉取的具体队列、offset、maxNums,拉取完成后需要执行的回调。

d. 根据选择的队列得到具体的broker地址,然后调用mqClient发送rpc请求到broker。

e. 收到响应,执行回调。

  • 将foundList(收到的消息列表)放入processQueue。
  • 调用submitConsumeRequest处理收到的消息体(实际上是将拉取到的消息丢入线程池,做异步化处理)。
  • 将pullRequest重新放回pullRequestQueue,启动下一次pull流程。

注:下图312行,pullRequest的nextOffseet已更新,所以下次拉取的时候会从新的offset开始拉取。

f. submitConsumeRequest处理流程最终会走到ConsumeRequest的run方法

  • 调用业务方注册的listener
  • 在processConsumeResult中处理消息消费Result、消费失败重试等逻辑

8.2 服务端处理消息拉取

入口在PullMessageProcessor#processRequest,该方法核心是DefaultMessageStore#getMessage。

  • 展开DefaultMessageStore#getMessage方法,传入topic、queueId、offset、maxMsgNum。
  • 根据topic,queueId确定consumeQueue文件。
  • 从找到的consumeQueue文件指定offset位置找到第一条‘索引’,读出对应在commitLog中的offset以及size
  • 用上述读取到的offset和size从commitLog中读到目标消息(此处有相对较多的随机读取)。
  • 依次循环,直到消息数量达到maxMsgNum。

九. 主从同步(HA)机制

为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费甚至丢失,RocketMQ引入了Broker主备机制。

即消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。

9.1 RocketMQ HA的实现原理

  1. master启动,并在特定端口上监听slave的连接。
  2. slave主动连接master,master接收客户端的连接,并建立相关TCP连接。
  3. slave主动向master发送待拉取消息偏移量,master解析请求并返回消息给从服务器
  4. slave保存消息并继续发送新的消息同步请求。

具体实现(HAService)

  • master启动,并在特定端口上监听从服务器的连接。

  • slave启动主动连接master(connectMaster)
  • slave默认固定每隔5s向master会报当前拉取到的消息偏移量(reportSlaveMaxOffset)
  • 处理master发过来的消息副本(processReadEvent)
  • 继续发送将要拉取的 消息偏移量,等到master响应(reportSlaveMaxOffsetPlus)

部分步骤详解

  • slave -> master请求详解 内容很简单,只有将要拉取的offset

  • master 处理 slave请求详解
  • 读取要拉取的offset
  • 更新slaveAckOffset,用于主从同步复制消息时及时响应给slave
  • 如果是初次收到slave请求,更新slaveRequestOffset

  • master发送header给slave(调用transferData)(包含接下来将要发送的offset)
  • master从本地commitLog中查询指定offset处的消息,填充进入selectMappedBufferResult
  • master调用transferData写出刚才查到的mq消息给到slave节点

  • 由于上述流程更新了nextTransferFromWhere,之后就可以不断循环,发送消息给到slave。

十.​Dledger模式

从4.5版本开始,rocket引入了一种新的高可用机制,即rocketMq的Dleger模式。

Dleger模式遵从Raft协议http://thesecretlivesofdata.com/raft/open in new window

我们以一个集群三台机器为例(包含一主两从)。

10.1 dleger的不同之处

引入dledger后,broker端处理消息发送的流程略有不同:

commitLog存储实现类换成了DledgerCommitLog,核心方法也从mappedFile.appendMessage换成了dLedgerServer.handleAppend,也就是说核心方法不再是把消息put进mappedByteBuffer就ok了。

10.2 dLedger消息存储主流程

剖析下dLedgerServer.handleAppend:

判断从节点是不是压力太大响应太慢,如果是则直接拒绝producer发过来的请求

  • 因为理论上三节点组成的cluster至少要有一个从节点响应成功消息才算正常落地。
  • 如果两个从节点都响应慢,则master也无法接口请求,只能报错。

如果从节点正常,则存入master本地的mappedByteBuffer(dLedgerStore.appendAsLeader)。

将刚存入本地的消息 交给dLedgerEntryPusher来负责发送给slave,并等待从节点的响应(waitAck)。

详解下waitAck:

  • 核心就一条,把msg在commitLog中的索引以及对应的future放入一个map。

注:方便slave响应回来之后,根据响应的index,调用对应index的future,以发送响应给到producer,表示消息发送成功。

10.3 消息从master到 slave的复制流程

master启动时同时启动了2个dispatcher,各自对应一个slave节点。

在dispatcher的主方法中,循环调用doAppend方法,不断复制消息发送到slave。

doAppend负责根据当前已经到达的位置writeIndex,查询并发送消息到从节点(doAppendInner),并更新writeIndex,循环往复。

doAppendInner 先查询index位置的消息,再rpc push消息到从节点

在从节点ack回调中,更新对应从节点所达到的水位(updatePeerWaterMark)。

注:水位即对应从节点已经追加到的index位置。

更新水位时,peerid为从节点的id,三个节点的水位信息统一放在peerWaterMarksByTerm。

master-> slave push消息,收到响应后更新水位,就是这段的总结。

10.4 消息复制成功后及时响应producer

消息复制成功后及时响应producer,以表示消息发送成功。

master启动时同样启动了一个QuorumAckChecker线程。

QuorumAckChecker主方法中进行水位(peerWaterMarksByTerm)的处理。

  • checker计算水位map中的数据,算出当前多数节点(两个节点就够了)已经达到的水位(quorumIndex),标志着这些消息已经收到slave的ack,低于这个水位的消息全部都可以给producer响应了。

注:举例子:

master append到了index 5,slave 1到了 4, slave 2 到了 3,则quorumIndex计算为4。

  • 处理的内容很简单,从pendingResponse的map中找出低于共识水位(quorumIndex)的消息发送response给到producer(调用future.complete)。
  • 这个future正是上边消息发送流程写完master本地后放入pendingResponse这个map的。
  • 至此整个producer的请求,响应流程已经结束。

Last Updated: