好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

RocketMq消息处理

RocketMq 消息处理整个流程如下:

本系列 RocketMQ4.8注释github地址 ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

消息接收:消息接收是指接收 producer 的消息,处理类是 SendMessageProcessor ,将消息写入到 commigLog 文件后,接收流程处理完毕; 消息分发: broker 处理消息分发的类是 ReputMessageService ,它会启动一个线程,不断地将 commitLong 分到到对应的 consumerQueue ,这一步操作会写两个文件: consumerQueue 与 indexFile ,写入后,消息分发流程处理 完毕; 消息投递:消息投递是指将消息发往 consumer 的流程, consumer 会发起获取消息的请求, broker 收到请求后,调用 PullMessageProcessor 类处理,从 consumerQueue 文件获取消息,返回给 consumer 后,投递流程处理完毕。

以上就是 rocketMq 处理消息的流程了,接下来我们就从源码来分析消息投递的实现。

1. 处理PULL_MESSAGE请求

与 producer 不同, consumer 从 broker 拉取消息时,发送的请求 code 为 PULL_MESSAGE , processor 为 PullMessageProcessor ,我们直接进入它的 processRequest 方法:

?
1
2
3
4
5
6
@Override
public RemotingCommand processRequest( final ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
     // 调用方法
     return this .processRequest(ctx.channel(), request, true );
}

这个方法就只是调用了一个重载方法,多出来的参数 true 表示允许 broker 挂起请求,我们继续,

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
  * 继续处理
  */
private RemotingCommand processRequest( final Channel channel, RemotingCommand request,
         boolean brokerAllowSuspend) throws RemotingCommandException {
     RemotingCommand response = RemotingCommand
         .createResponseCommand(PullMessageResponseHeader. class );
     final PullMessageResponseHeader responseHeader
         = (PullMessageResponseHeader) response.readCustomHeader();
     final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader)
         request.decodeCommandCustomHeader(PullMessageRequestHeader. class );
     response.setOpaque(request.getOpaque());
     // 省略权限校验流程
     // 1. rocketMq 可以设置校验信息,以阻挡非法客户端的连接
     // 2. 同时,对topic可以设置DENY(拒绝)、ANY(PUB 或者 SUB 权限)、PUB(发送权限)、SUB(订阅权限)等权限,
     //    可以细粒度控制客户端对topic的操作内容
     ...
     // 获取订阅组
     SubscriptionGroupConfig subscriptionGroupConfig =
         this .brokerController.getSubscriptionGroupManager()
         .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
     ...
     // 获取订阅主题
     TopicConfig topicConfig = this .brokerController.getTopicConfigManager()
         .selectTopicConfig(requestHeader.getTopic());
     ...
     // 处理filter
     // consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92
     // 这里我们重点关注拉取消息的流程,具体的过滤细节后面再分析
     ...
     // 获取消息
     // 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
     // 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
     final GetMessageResult getMessageResult = this .brokerController.getMessageStore().getMessage(
         requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
         requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
     if (getMessageResult != null ) {
         // 省略一大堆的校验过程
         ...
         switch (response.getCode()) {
             // 表示消息可以处理,这里会把消息内容写入到 response 中
             case ResponseCode.SUCCESS:
                 ...
                 // 处理消息消息内容,就是把消息从 getMessageResult 读出来,放到 response 中
                 if ( this .brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                     final long beginTimeMills = this .brokerController.getMessageStore().now();
                     // 将消息内容转为byte数组
                     final byte [] r = this .readGetMessageResult(getMessageResult,
                         requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                         requestHeader.getQueueId());
                     ...
                     response.setBody(r);
                 } else {
                     try {
                         // 消息转换
                         FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(
                             getMessageResult.getBufferTotalSize()), getMessageResult);
                         channel.writeAndFlush(fileRegion).addListener( new ChannelFutureListener() {
                             ...
                         });
                     } catch (Throwable e) {
                         ...
                     }
                     response = null ;
                 }
                 break ;
             // 未找到满足条件的消息
             case ResponseCode.PULL_NOT_FOUND:
                 // 如果支持挂起,就挂起当前请求
                 if (brokerAllowSuspend && hasSuspendFlag) {
                     ...
                     PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                         this .brokerController.getMessageStore().now(), offset, subscriptionData,
                         messageFilter);
                     // 没有找到相关的消息,挂起操作
                     this .brokerController.getPullRequestHoldService()
                         .suspendPullRequest(topic, queueId, pullRequest);
                     response = null ;
                     break ;
                 }
             // 省略其他类型的处理
             ...
                 break ;
             default :
                 assert false ;
         }
     } else {
         response.setCode(ResponseCode.SYSTEM_ERROR);
         response.setRemark( "store getMessage return null" );
     }
     ...
     return response;
}

在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:

权限校验: rocketMq 可以设置校验信息,以阻挡非法客户端的连接,同时也可以设置客户端的发布、订阅权限,细节度控制访问权限; 获取订阅组、订阅主题等,这块主要是通过请求消息里的内容获取 broker 中对应的记录 创建过滤组件: consumer 在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种: tag 与 sql92 获取消息:先是根据 topic 与 queueId 获取 ConsumerQueue 文件,根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容,消息的过滤操作也是发生在这一步 转换消息:如果获得了消息,就是把具体的消息内容,复制到 reponse 中 挂起请求:如果没获得消息,而当前请求又支持挂起,就挂起当前请求

以上代码还是比较清晰的,相关流程代码中都作了注释。

以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:

获取消息 挂起请求

2. 获取消息

获取消息的方法为 DefaultMessageStore#getMessage ,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public GetMessageResult getMessage( final String group, final String topic, final int queueId,
         final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
     // 省略一些判断
     ...
     // 根据topic与queueId一个ConsumeQueue,consumeQueue记录的是消息在commitLog的位置
     ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
     if (consumeQueue != null ) {
         minOffset = consumeQueue.getMinOffsetInQueue();
         maxOffset = consumeQueue.getMaxOffsetInQueue();
         if (...) {
             // 判断 offset 是否符合要求
             ...
         } else {
             // 从 consumerQueue 文件中获取消息
             SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
             if (bufferConsumeQueue != null ) {
                 ...
                 for (; i &lt; bufferConsumeQueue.getSize() &amp;&amp; i &lt; maxFilterMessageCount;
                     i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                     // 省略一大堆的消息过滤操作
                     ...
                     // 从 commitLong 获取消息
                     SelectMappedBufferResult selectResult
                             = this 测试数据mitLog.getMessage(offsetPy, sizePy);
                     if ( null == selectResult) {
                         if (getResult.getBufferTotalSize() == 0 ) {
                             status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                         }
                         nextPhyFileStartOffset = this 测试数据mitLog.rollNextFile(offsetPy);
                         continue ;
                     }
                     // 省略一大堆的消息过滤操作
                     ...
                 }
             }
     } else {
         status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
         nextBeginOffset = nextOffsetCorrection(offset, 0 );
     }
     if (GetMessageStatus.FOUND == status) {
         this .storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
     } else {
         this .storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
     }
     long elapsedTime = this .getSystemClock().now() - beginTime;
     this .storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
     getResult.setStatus(status);
     // 又是处理 offset
     getResult.setNextBeginOffset(nextBeginOffset);
     getResult.setMaxOffset(maxOffset);
     getResult.setMinOffset(minOffset);
     return getResult;
}

这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:

根据 topic 与 queueId 找到 ConsumerQueue 从 ConsumerQueue 对应的文件中获取消息信息,如 tag 的 hashCode 、消息在 commitLog 中的位置信息 根据位置信息,从 commitLog 中获取完整的消息

经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据 tag 或 sql 语法来过滤消息,关于消息过滤的一些细节,我们留到后面 消息过滤 相关章节作进一步分析。

3. 挂起请求:PullRequestHoldService#suspendPullRequest

当 broker 无新消息时, consumer 拉取消息的请求就会挂起,方法为 PullRequestHoldService#suspendPullRequest :

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class PullRequestHoldService extends ServiceThread {
     private ConcurrentMap<String /* topic@queueId */ , ManyPullRequest> pullRequestTable =
         new ConcurrentHashMap<String, ManyPullRequest>( 1024 );
     public void suspendPullRequest( final String topic, final int queueId,
             final PullRequest pullRequest) {
         String key = this .buildKey(topic, queueId);
         ManyPullRequest mpr = this .pullRequestTable.get(key);
         if ( null == mpr) {
             mpr = new ManyPullRequest();
             ManyPullRequest prev = this .pullRequestTable.putIfAbsent(key, mpr);
             if (prev != null ) {
                 mpr = prev;
             }
         }
         mpr.addPullRequest(pullRequest);
     }
     ...
}

在 suspendPullRequest 方法中,所做的工作仅是把当前请求放入 pullRequestTable 中了。从代码中可以看到, pullRequestTable 是一个 ConcurrentMap , key 是 topic@queueId , value 就是挂起的请求了。

请求挂起后,何时处理呢?这就是 PullRequestHoldService 线程的工作了。

3.1 处理挂起请求的线程:PullRequestHoldService

看完 PullRequestHoldService#suspendPullRequest 方法后,我们再来看看 PullRequestHoldService 。

PullRequestHoldService 是 ServiceThread 的子类(上一次看到 ServiceThread 的子类还是 ReputMessageService ),它也会启动一个新线程来处理挂起操作。

我们先来看看它是在哪里启动 PullRequestHoldService 的线程的,在 BrokerController 的启动方法 start() 中有这么一行:

BrokerController#start

?
1
2
3
4
5
6
7
public void start() throws Exception {
     ...
     if ( this .pullRequestHoldService != null ) {
         this .pullRequestHoldService.start();
     }
     ...
}

这里就是启动 pullRequestHoldService 的线程操作了。

为了探究这个线程做了什么,我们进入 PullRequestHoldService#run 方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void run() {
     log.info( "{} service started" , this .getServiceName());
     while (! this .isStopped()) {
         try {
             // 等待中
             if ( this .brokerController.getBrokerConfig().isLongPollingEnable()) {
                 this .waitForRunning( 5 * 1000 );
             } else {
                 this .waitForRunning(
                     this .brokerController.getBrokerConfig().getShortPollingTimeMills());
             }
             long beginLockTimestamp = this .systemClock.now();
             // 检查操作
             this .checkHoldRequest();
             long costTime = this .systemClock.now() - beginLockTimestamp;
             if (costTime > 5 * 1000 ) {
                 log.info( "[NOTIFYME] check hold request cost {} ms." , costTime);
             }
         } catch (Throwable e) {
             log.warn( this .getServiceName() + " service has exception. " , e);
         }
     }
     log.info( "{} service end" , this .getServiceName());
}

从代码来看,这个线程先是进行等待,然后调用 PullRequestHoldService#checkHoldRequest 方法,看来关注就是这个方法了,它的代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void checkHoldRequest() {
     for (String key : this .pullRequestTable.keySet()) {
         String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
         if ( 2 == kArray.length) {
             String topic = kArray[ 0 ];
             int queueId = Integer.parseInt(kArray[ 1 ]);
             final long offset = this .brokerController.getMessageStore()
                 .getMaxOffsetInQueue(topic, queueId);
             try {
                 // 调用notifyMessageArriving方法操作
                 this .notifyMessageArriving(topic, queueId, offset);
             } catch (Throwable e) {
                 log.error(...);
             }
         }
     }
}

这个方法调用了 PullRequestHoldService#notifyMessageArriving(...) ,我们继续进入:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void notifyMessageArriving( final String topic, final int queueId, final long maxOffset) {
     // 继续调用
     notifyMessageArriving(topic, queueId, maxOffset, null , 0 , null , null );
}
/**
  * 这个方法就是最终调用的了
  */
public void notifyMessageArriving( final String topic, final int queueId, final long maxOffset,
     final Long tagsCode, long msgStoreTime, byte [] filterBitMap, Map<String, String> properties) {
     String key = this .buildKey(topic, queueId);
     ManyPullRequest mpr = this .pullRequestTable.get(key);
     if (mpr != null ) {
         List<PullRequest> requestList = mpr.cloneListAndClear();
         if (requestList != null ) {
             List<PullRequest> replayList = new ArrayList<PullRequest>();
             for (PullRequest request : requestList) {
                 // 判断是否有新消息到达,要根据 comsumerQueue 的偏移量与request的偏移量判断
                 long newestOffset = maxOffset;
                 if (newestOffset <= request.getPullFromThisOffset()) {
                     newestOffset = this .brokerController.getMessageStore()
                         .getMaxOffsetInQueue(topic, queueId);
                 }
                 if (newestOffset > request.getPullFromThisOffset()) {
                     boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                         new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                     if (match && properties != null ) {
                         match = request.getMessageFilter().isMatchedByCommitLog( null , properties);
                     }
                     if (match) {
                         try {
                             // 唤醒操作
                             this .brokerController.getPullMessageProcessor()
                                 .executeRequestWhenWakeup(request.getClientChannel(),
                                 request.getRequestCommand());
                         } catch (Throwable e) {
                             log.error( "execute request when wakeup failed." , e);
                         }
                         continue ;
                     }
                 }
                 // 超时时间到了
                 if (System.currentTimeMillis() >=
                         (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                     try {
                         // 唤醒操作
                         this .brokerController.getPullMessageProcessor()
                             .executeRequestWhenWakeup(request.getClientChannel(),
                             request.getRequestCommand());
                     } catch (Throwable e) {
                         log.error( "execute request when wakeup failed." , e);
                     }
                     continue ;
                 }
                 replayList.add(request);
             }
             if (!replayList.isEmpty()) {
                 mpr.addPullRequest(replayList);
             }
         }
     }
}

这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者 pullRquest hold 住的时间到了,就唤醒 pullRquest (即调用 PullMessageProcessor#executeRequestWhenWakeup 方法)。

在判断是否有新消息送达时,会获取 comsumerQueue 文件中的最大偏移量,与当前 pullRquest 中的偏移量进行比较,如果前者大,就表示有新消息送达了,需要唤醒 pullRquest 前面说过,当 consumer 请求没获取到消息时, broker 会 hold 这个请求一段时间(30s),当这个时间到了,也会唤醒 pullRquest ,之后就不会再 hold 住它了

3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup

我们再来看看 PullMessageProcessor#executeRequestWhenWakeup 方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void executeRequestWhenWakeup( final Channel channel,
     final RemotingCommand request) throws RemotingCommandException {
     // 关注 Runnable#run() 方法即可
     Runnable run = new Runnable() {
         @Override
         public void run() {
             try {
                 // 再一次调用 PullMessageProcessor#processRequest(...) 方法
                 final RemotingCommand response = PullMessageProcessor. this
                     .processRequest(channel, request, false );
                 ...
             } catch (RemotingCommandException e1) {
                 log.error( "excuteRequestWhenWakeup run" , e1);
             }
         }
     };
     // 提交任务
     this .brokerController.getPullMessageExecutor()
         .submit( new RequestTask(run, channel, request));
}

这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了 PullMessageProcessor#processRequest(...) 方法,这个方法就是本节一始提到的处理 consumer 拉取消息的方法了。

3.3 消息分发中唤醒consumer请求

在分析消息分发流程时, DefaultMessageStore.ReputMessageService#doReput 方法中有这么一段:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doReput() {
     ...
     // 分发消息
     DefaultMessageStore. this .doDispatch(dispatchRequest);
     // 长轮询:如果有消息到了主节点,并且开启了长轮询
     if (BrokerRole.SLAVE != DefaultMessageStore. this
             .getMessageStoreConfig().getBrokerRole()
             &&DefaultMessageStore. this .brokerConfig.isLongPollingEnable()){
         // 调用NotifyMessageArrivingListener的arriving方法
         DefaultMessageStore. this .messageArrivingListener.arriving(
             dispatchRequest.getTopic(),
             dispatchRequest.getQueueId(),
             dispatchRequest.getConsumeQueueOffset() + 1 ,
             dispatchRequest.getTagsCode(),
             dispatchRequest.getStoreTimestamp(),
             dispatchRequest.getBitMap(),
             dispatchRequest.getPropertiesMap());
     }
     ...
}

这段就是用来主动唤醒 hold 住的 consumer 请求的,我们进入 NotifyMessageArrivingListener#arriving 方法:

?
1
2
3
4
5
6
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
     long msgStoreTime, byte [] filterBitMap, Map<String, String> properties) {
     this .pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
         msgStoreTime, filterBitMap, properties);
}

最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...) 方法。

总结

本文主要分析了 broker 处理 PULL_MESSAGE 请求的流程,总结如下:

broker 处理 PULL_MESSAGE 的 processor 为 PullMessageProcessor , PullMessageProcessor 的 processRequest(...) 就是整个消息获取流程了 broker 在获取消息时,先根据请求的 topic 与 queueId 找到 consumerQueue ,然后根据请求中的 offset 参数从 consumerQueue 文件中找到消息在 commitLog 的位置信息,最后根据位置信息从 commitLog 中获取消息内容 如果 broker 中没有当前 consumerQueue 的消息, broker 会挂起当前线程,直到超时(默认30s)或收到新的消息时再唤醒

参考  

RocketMQ源码分析专栏

以上就是RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析的详细内容,更多关于RocketMQ broker 消息投递的资料请关注其它相关文章!

原文链接:https://juejin.cn/post/7215518166264856633

查看更多关于RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析的详细内容...

  阅读:23次