RocketMQ5 PopAck源码拆解
-
锁consumer queue -
计算pop offset -
读取消息 -
添加check point -
释放consumer queue锁
-
Pop offset -
Check point -
ReceiptHandle -
StartOffsetInfo - MsgOffsetInfo
- OrderCountInfo
三、Ack流程
- proxy提交ack请求
- 内存标记消费进度
- 持久化ack到revive topic
- 异步标记消费进度
- 可见时间过后,消息恢复消费
-
消费者卡住问题
下图是pop消费者订阅consumer queue的情况:
从上图可以看出来,每个pop client消费全部broker的全部consumer queue。
如果pop client2卡住了,其他的pop client会消费全部的consumer queue,在push消费中queue由于消费卡住或者无人消费而堆积的问题得到解决。
-
负载均衡慢的问题。 如果出现push consumer卡住,或者gc导致消费慢,此时我们一般通过重启消费者程序来临时解决。 消费者重启后reblance,消费者数量越多reblance花费的时间越长,在reblance时消费者无法消费消息。
-
有状态变无状态。
-
消费实例数上限问题。
在pop消息的时候,调用queueLockManager.tryLock(lockKey)方法,实现加锁。 锁key的格式如下:
String lockKey =
topic + // topic名字
PopAckConstants.SPLIT + // 分隔符
requestHeader.getConsumerGroup() + // 消费者组
PopAckConstants.SPLIT + // 分隔符
queueId; // consumer queue id
-
Pop offset
pop offset的值计算有3个来源:
第一个:查询已经提交的位点。每次用户消费完成,提交消费位点后,会更新到这里。
第二步:检查重置消费位点。目前5.1.4版本的重置消费位点也会单独存储。
这个是5.X中新增的逻辑, 如果broker配置
useServerSideResetOffset=true,则通过admin api可以直接重置位点, 重置的位点会临时保存,提供给pop这个时候使用。
第三步:检查ack提交的消费位点。
一次pop一般会pop一批消息, 而ack可能是一条一条的ack的,所以需要检查当前ack提交到哪条消息,已经被pop而没有被ack的不能再次pop,直到被重试或者被恢复到用户topic。
-
Check point
public class PopCheckPoint implements Comparable<PopCheckPoint> {
// 本次pop消息d的起始consumer queue offset
private long startOffset;
// 本次pop时的时间戳,单位毫秒
private long popTime;
// 本次pop消息d的不可见时间,单位毫秒
// 一般来自pop客户端请求的request header
private long invisibleTime;
// 特别重要
// 记录本次pop消息的ack情况
private int bitMap;
// 本次pop消息d的条数
private byte num;
// 本次pop的consumer queue id
private int queueId;
// 本次pop 的topic
private String topic;
// 本次pop 的消费者组
private String cid;
// 特别重要
// revieve topic的位点,后面详细讲解
private long reviveOffset;
// 特别重要
// 本次拉取消息d的每个消息的queue offset 减去 pop offset
// 的差值
private List<Integer> queueOffsetDiff;
// 本次pop 消息所在d的broker
private String brokerName;
}
– bitMap
这个字段是一个int类型,1个int是由32个bit表示,每个bit其实就是0,1,rocketmq利用bitmap标记本次pop的消息哪些被ack(标记为1),哪些未ack(标记为0)。具体过程详见后面讲解ack的过程。
– reviveOffset
revive英文翻译是恢复的含义,那些不可见消息基础信息(非消息body)会保存到revive topic中,到时间后会被revive服务恢复到用户的原始topic中让用户再次消费。
reviveOffset就是这个revive topic的consumer queue位点。
– queueOffsetDiff
-
ReceiptHandle 这个值叫一条消息的句柄,每个消息一条,ack的时候会给到broker,broker通过解析判断ack的哪次pop的哪条消息, 格式如下:
-
StartOffsetInfo
这个数据结构主要在proxy中被用到,用来帮助构造pop_ck, 也就是pop消息的句柄。因为数据简单, 样例大家自行debug看看吧。
proxy中使用的代码如下:
上文不是说句柄broker已经构造了, 为什么proxy还需要再构造一次呢? 大家可以思考下。
-
MsgOffsetInfo
这个数据结构主要在proxy中被用到,用来帮助构造pop_ck, 也就是pop消息的句柄。因为数据简单, 样例大家自行debug看看吧。
proxy中使用的代码如下:
-
OrderCountInfo
AckMessageProcessor.processRequest()方法,其中虚线是异步的流程,实线是同步流程。笔者将其分为以下5步。
第一步:proxy提交ack请求
用户提交ack请求,ack请求被Broker的AckMessageProcessor.processRequest(Channel, RemotingCommand, boolean)方法处理,并解析AckMessageRequestHeader。
AckMessageRequestHeader中包含pop ck信息, 这里逻辑上区分单个消息ack还是批量消息ack:
标记1: 单个消息ack
标记2: 批量消息ack
标记3: appendAck()方法是ack核心逻辑,后面的全部逻辑都在这个方法中实现。
标记4: 批量执行appendAck()方法。
可以看到标记4处理非原子操作是一种风险,批量提交结果未知,以最终结果一致为准。
第二步:内存标记消费进度
经过第一步后,我们知道核心逻辑在appendAck()中: rocketmq将ack request header解析为AckMsg,并且调用PopBufferMergeService.addAk()将ack msg写入PopBufferMergeService的缓存中。
PopBufferMergeService顾名思义,是一个在内存中提供合并的服务。
合并什么呢, 合并ack和ck消息,也就是用ack 的consumer queue offset去标记ck中的bitmap。
其实就是标记一个ck中的哪些消息被ack了,也就是标记了消费进度。
下面讲解一些关键变量:
point:是当前ack对应的pop check point对象,里面有一个bitmap用来标记每个消息是否被ack,
具体如何标记呢:
– 假设拉取了4个消息,组成一个数组,每个“消息的下标”分别为:0,1,2,3
– 4个消息是否消费的标记由4个“二进制标记”组成一个数组
– 二进制标记数组,可以转化为“1个10进制数int”保存ck对象中
图示如下:
我们从pop check point对象初始化的时候可以知道, bitmap是一个int,并且初始化的值为0。将0转化为二进制,可以知道每一个bit都是0。
我们用这个bitmap的前4个bit来举例说明是如何标记每条消息是否ack的。
将int转化为bitmap,是一个bit数组,每个数组元素的下标表示pop的消息的下标。
比如pop了4条消息,按照consumer queue offset从小到大排序就会有4个consumer queue offset的下标。
假如在时间t1pop了4条消息,consumer queue offset为[100, 101, 102, 103]。
如果第一次ack了100,则bitmap中下标=0的bit设置为1。
bit数组的结果就是上图第一列。
如果第一次ack了101,则bitmap中下标=1的bit设置为1。
bit数组的结果就是上图第列二列。
如果分别ack了第一个、第三个消息,则bitmap的结果如上图最右一列。
每次ack后,bitmap都可以转化为int,并且将这个int保存到pop check中。
这里会有3个问题
– 全部的消息都ack
– 用户在允许的时间内没有ack完成全部消息
– 用户ack的时候, check point消息已经不存在了
这些问题在下一步会被处理。
第三步:持久化ack到revive topic
在上一步中, 如果消息全部被ack了, 这个是正常情况, 将最终的消费位点提交到consumer offset manager中,consumer offset manager会定时自动持久化消费位点。
如果用户在允许的时间内,没有ack完成全部的消息, 此时pop check point会被删除,这些消息用户可以继续pop。
下面介绍了这个超时时间是如何计算的:来自pop时间和不可见时间。这里可以解释不可见时间超过后, 为什么可以再次pop到消息了。
如果用户在ack的时候, pop check point消息不存在了怎么办?
首先是为什么pop check point会不存在?
– 内存不能保存全部的ck。pop check point信息会保存到内存中, 这里不可能保存全部的pop check point, broker提供配置popCkMaxBufferSize内存最大可以保存的pop check point数,默认20w。
超过后, pop check point消息会直接持久化到revieve topic。
– 允许时间内没有ack的的ck需要丢弃,这个ck对应的全部消息全部对用户再次可见。
如果check point不存在了, 则将ack消息保存到revieve topic中,方便与持久化的pop check point再次匹配标记哪些消息被ack了。
第四步:异步标记消费进度
经过上一步,我们知道有一些check point信息和ack信息会被持久化到revieve topic。
PopBufferMergeService服务是一个后台服务, 会消费revieve topic中的ack、ck信息,然后做异步匹配, 来标记ck信息中的用户消息哪些被ack了。
这里细节特别多, 建议大家debug查看,这里如果需要细讲大家留言我们再出一期。
经过scan后,可以知道哪些ck中的用户消息被全部ack了, 就会提交消费位点到 consumer queue offset manager:
如果经过这一步,还是有ck没有完全被ack呢?请看下一步。
第五步:可见时间过后,消息恢复消费
如果经过上一步还有ck没有被ack完全匹配,此时这些ck对应的用户消息将被重新可见,用户可以重新pop。
这个过程是在 PopReviveService服务中实现的, 这也是一个后台服务, 会定时检查哪些ck没有被完全ack, 然后根据ck将这个ck包含的全部消息重新恢复到重试topic中。
PopBufferMergeService还有大量的细节, 建议大家通过在每个关键点打日志,然后生产消费模拟ack的几种情况再查看日志输出,再结合代码很快就会了解更多的细节。
结尾也留2个问题,欢迎大家讨论
– 1. 同一个pop ck,多次重复ack会出现什么情况, Broker是如何处理的?
– 2. 如果pop没有读取到消息需要写ck信息吗, 为什么?
– 3. 下期准备讲proxy或者基于时间轮的任意定时消息,想看什么请留言。
微信赞赏支付宝扫码领红包