RocketMQ系列4

1、Consumer消费消息两种模式比较

RocketMQ提供两种方式进行消费消息pull vs push,这也是很多涉及到Client和Server之间的交互模型

1.1 pull模式

主要是Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数。

好处:是如果Broker消息特别多的话,消费端按照自身的消费能力匀速消费消息,不至于被大量消息打死。

缺陷:消息超时时间可以配置,设置短则会轮训频率过快服务端会承担压力,甚至导致空转。设置长则导致消息接收不及时。

public static void main(String[] args) throws MQClientException {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocketmq-consumer");

consumer.setNamesrvAddr("127.0.0.1:9876");

consumer.start();

try {

MessageQueue mq = new MessageQueue();

mq.setQueueId(0);

mq.setTopic("mq-test");

mq.setBrokerName("broker-a");

long offset = 26;

PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);

System.out.printf("%s%n", pullResult);

} catch (Exception e) {

e.printStackTrace();

}

consumer.shutdown();

}

1.2 push模式

Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费。

好处:可以及时收到新的消息,消费端不会产生额外的延迟。

缺陷:当有大量的推送消息会加重消费端的负载甚至将消费端打死。同时Broker会维护所有建连的客户端连接。

public static void main(String[] args) throws InterruptedException, MQClientException {

// 构造方法

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer");

consumer.setNamesrvAddr("localhost:9876");

consumer.subscribe("mq-test", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List msgs,ConsumeConcurrentlyContext context) {

System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

}

在现实中更多根据实际场景进行选择,大多场景更喜欢使用Push模式进行消费消息,那么Push是真正Broker端发送给Consumer的吗?答案肯定不是的,现实场景会有成百上千的Consumer对应的消息队列,Broker不会主动发送消息请求的。所以消息队列如何进行设计消息推送的呢?答案是长轮询。

2、RocketMQ如何实现长轮询

长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据。consumer收到响应后根据状态判断是否有消息。

2.1 Consumer端处理

2.1.1 Consumer启动

首先是Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下。

MQClientInstance#start

public void start() throws MQClientException {

synchronized (this) {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

// 前后省略

this.pullMessageService.start();

break;

case START_FAILED:

throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

default:

break;

}

}

}

2.1.2 Consumer请求

可以看到启动后Consumer则不断轮询 Broker 获取消息。 Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息。

PullMessageService# 客户端发起拉取消息请求

public void run() {

while (!this.isStopped()) {

try {

PullRequest pullRequest = this.pullRequestQueue.take(); // 将返回结果添加到Queue

this.pullMessage(pullRequest);

}

}

Consumer处理的逻辑包括:

判断 Consumer 处于运行中状态、Consumer 处于暂停中。

消息处理队列持有消息最大量和消息体最大量。

根据 consumeOrderly 判断是否为顺序消息。后续在进行分析。

根据topic获取订阅组信息。

真正拉取消息,发起netty请求。请求参数包含 messageQueue(可以认为标识当前客户端该topic具有唯一性)、当前 Consumer 最大偏移量、每次拉取数量、拉取方式(同步||异步)、回调函数PullCallback。还有netty连接的超时时长 timeoutMillis = 30s 和 broker端hold时长 brokerSuspendMaxTimeMillis =15s

private void pullMessage(final PullRequest pullRequest) {

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

if (consumer != null) {

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;

impl.pullMessage(pullRequest);

}

}

DefaultMQPushConsumerImpl#pullMessage

try {

// 真正拉取消息的地方,首先获取Broker信息

this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression,subscriptionData.getExpressionType(),

subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(),

sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,

CommunicationMode.ASYNC, pullCallback);

}

2.1.3 Consumer响应处理

PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求。

FOUND:有消息则进行处理结果和统计、更新最新的偏移量(本地或者远程),完成后将请求添加到pullRequestQueue队列里继续轮训。

NO_NEW_MSG:拉取请求没有新消息但超过hold时长返回, 会进行下一轮消息拉取请求。

NO_MATCHED_MSG:有新消息但是没有匹配。

OFFSET_ILLEGAL:拉取的消息队列位置不合法,需要更新消费进度再进行下一轮消息拉取。

DefaultMQPushConsumerImpl#pullMessage

// 当拉取的请求有响应时

PullCallback pullCallback = new PullCallback() {

public void onSuccess(PullResult pullResult) {

if (pullResult != null) {

pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);

switch (pullResult.getPullStatus()) {

case FOUND:

long prevRequestOffset = pullRequest.getNextOffset();

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

// 统计消费组下消息主题拉取耗时

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),

pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

} else {

firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

// 提交拉取到的消息到消息处理队列

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

// 提交消费请求 ConsumeRequest#run 拉取消息响应listener.consumeMessage最终返回给客户端,同时也包括执行前和执行后逻辑

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());

} else {

// 消费者拉取完消息后,立马就有开始下一个拉取任务

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

}

}

if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {

}

break;

case NO_NEW_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

//消费者没有消息,立马就有开始下一个拉取任务

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

break;

default:

break;

}

}

}

2.2 Broker收到Consumer请求

可以思考一下Broker端需要面临哪些设计?

当生产端发送消息后,如何实时将消息发给消费端?

当Consumer长轮询怎样设置hold住时长?

Broker端hold连接后如何解决占用资源问题?

2.2.1 没有收到消息?如何hold请求

如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。

hold的请求存放在 ConcurrentHashMap 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List 可以理解对应的多个相同的topic客户端。

1. Consumer发起拉取消息请求,Broker端无消息

Broker端

PullMessageProcessor#processRequest

// broker端没有拉取到消息

case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {

long pollingTimeMills = suspendTimeoutMillisLong;

if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {

pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();

}

String topic = requestHeader.getTopic();

long offset = requestHeader.getQueueOffset();

int queueId = requestHeader.getQueueId();

PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);

this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);

response = null;

break;

}

// 先将拉取请求放在this.pullRequestTable中,进行挂载起来

public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {

String key = this.buildKey(topic, queueId);

ManyPullRequest mpr = this.pullRequestTable.get(key);

if (null == mpr) {

mpr = new ManyPullRequest();

ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);

if (prev != null) {

mpr = prev;

}

}

mpr.addPullRequest(pullRequest);

}

2.2.2 hold请求超时处理

Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测。

PullRequestHoldService 轮训遍历是否阻塞请求快到超时时间,进行唤醒

public void run() {

while (!this.isStopped()) {

try {

if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {

this.waitForRunning(5 * 1000);

} else {

this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());

}

long beginLockTimestamp = this.systemClock.now();

this.checkHoldRequest();

long costTime = this.systemClock.now() - beginLockTimestamp;

if (costTime > 5 * 1000) { }

}

}

}

//

private void checkHoldRequest() {

for (String key : this.pullRequestTable.keySet()) {

String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);

if (2 == kArray.length) {

String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]);

final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);

try {

this.notifyMessageArriving(topic, queueId, offset);

}

}

}

}

2.2.3 服务端收到Producer消息

Producer写入消息,Broker端有消息通知Consumer端。

当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest。

DefaultMessageStore#doReput

if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()

&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {

DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),

dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,

dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),

dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());

}

NotifyMessageArrivingListener#arriving

public void arriving(String topic, int queueId, long logicOffset, long tagsCode,

long msgStoreTime, byte[] filterBitMap, Map properties) {

this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);

}

PullRequestHoldService

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,

long msgStoreTime, byte[] filterBitMap, Map properties) {

String key = this.buildKey(topic, queueId);

ManyPullRequest mpr = this.pullRequestTable.get(key);

if (mpr != null) {

List requestList = mpr.cloneListAndClear(); // 克隆挂起的请求列表

if (requestList != null) {

List replayList = new ArrayList();

for (PullRequest request : requestList) {

long newestOffset = maxOffset;

if (newestOffset <= request.getPullFromThisOffset()) {

newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);

}

if (newestOffset > request.getPullFromThisOffset()) {

boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));

// 从挂起的请求列表中找到当前新的消息的匹配的,匹配到了则唤起请求立即给客户端返回。

if (match) {

try {

this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());

}

continue;

}

}

// 如果列表中挂起的请求快超时了则立即唤醒返回给客户端

if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {

try {

this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());

}

continue;

}

replayList.add(request);

}

if (!replayList.isEmpty()) {

mpr.addPullRequest(replayList);

}

}

}

}

3、总结

当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户。

如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE。

image.jpeg