博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ源码 — 四、 Consumer 接收消息过程
阅读量:7260 次
发布时间:2019-06-29

本文共 6805 字,大约阅读时间需要 22 分钟。

Consumer

consumer pull message

订阅

在Consumer启动之前先将自己放到一个本地的集合中,再以后获取消费者的时候会用到,同时会将自己订阅的信息告诉broker

接收消息

consumer启动的时候会启动两个service:

RebalanceService:主要实现consumer的负载均衡,但是并不会直接发送获取消息的请求,而是构造request之后放到PullMessageService中,等待PullMessageService的线程取出执行
PullMessageService:主要负责从broker获取message,包含一个需要获取消息的请求队列(是阻塞的),并不断依次从队列中取出请求向broker send Request

执行时序图太大,截屏截不全,所以放在git上,在

从Broker pullMessage

在PullMessageService中通过netty发送pull消息的请求之后,Broker的remoteServer会收到request,然后在PullMessageProcessor中的processRequest处理,先会解析requestHeader,request中带了读取MessageStore的参数:

  • consumerGroup
  • topic
  • queueId
  • queueOffset
  • MaxMsgNums
  • subscriptionData(ConsumerManager中获取)

processRequest处理流程

  • 判断Broker当前是否允许接收消息
  • 找到subscriptionGroupConfig,subscriptionGroupTable,如果不存在当前的group则新增一个
  • 是否subscriptionGroupConfig.consumeEnable
  • 获取从TopicConfigManager.topicConfigTable获取topicConfig
  • topicConfig是否有读权限
  • 校验queueId是否在范围内
  • ConsumerManager.getConsumerGroupInfo获取ConsumerGroupInfo
  • consumerGroupInfo.findSubscriptionData查找subscriptionData
  • MessageStore.getMessage读取消息,从文件中读取消息,属于pullMessage的核心方法
  • 判断brokerRole,master和slave工作模式的哪一种
  • 判断MessageResult.status
  • 如果responseCode是SUSCESS,判断是使用heap还是非heap方式传输数据
  • 使用netty序列化response返回netty客户端

负载均衡

// AllocateMessageQueueAveragelypublic List
allocate(String consumerGroup, String currentCID, List
mqAll, List
cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List
result = new ArrayList
(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", // consumerGroup, // currentCID,// cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result;}// RebalanceImplprivate boolean updateProcessQueueTableInRebalance(final String topic, final Set
mqSet, final boolean isOrder) { boolean changed = false; // 去掉topic对应的无用MessageQueue(不包含在processQueueTable或者pullExpired) Iterator
> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry
next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } List
pullRequestList = new ArrayList
(); for (MessageQueue mq : mqSet) { // 如果MessageQueue不在processQueueTable if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); // 如果message还有数据需要读 if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 将pullRequest放在pullRequestQueue中等待去取数据 this.dispatchPullRequest(pullRequestList); return changed;}

issue

多个consumer读取同一个consumerQueue的时候怎么记录每个读取的进度

每个consumer只要记住自己读取哪一个队列,以及offset就可以了

pull消息过程中的关键类

DefaultMQPushConsumerImpl

供DefaultMQPushConsumer调用,作为consumer的默认实现:

AllocateMessageQueueAveragely

这个类主要是consumer消费的负载均衡算法

public List
allocate(String consumerGroup, String currentCID, List
mqAll, List
cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List
result = new ArrayList
(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", // consumerGroup, // currentCID,// cidAll); return result; } // 基本原则,每个队列只能被一个consumer消费 // 当messageQueue个数小于等于consume的时候,排在前面(在list中的顺序)的consumer消费一个queue,index大于messageQueue之后的consumer消费不到queue,也就是为0 // 当messageQueue个数大于consumer的时候,分两种情况 // 当有余数(mod > 0)并且index < mod的时候,当前comsumer可以消费的队列个数是 mqAll.size() / cidAll.size() + 1 // 可以整除或者index 大于余数的时候,队列数为:mqAll.size() / cidAll.size() int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result;}

转载于:https://www.cnblogs.com/sunshine-2015/p/6295100.html

你可能感兴趣的文章
LeetCode之Squares of a Sorted Array(Kotlin)
查看>>
TiDB 在新乐视云联“月光宝盒”项目中的应用与实践
查看>>
自动化构建工具gulp的学习心得
查看>>
JS 类型
查看>>
利用UIPageViewController实现图片轮播(简单实用版本)
查看>>
Python 数据库骚操作 -- MongoDB
查看>>
Linux服务器上搭建svn服务器
查看>>
underscore 源码阅读系列 -- for...in 循环的兼容性问题
查看>>
iOS逆向之旅(进阶篇) — 重签名APP(二)
查看>>
Android进阶/面试重难点
查看>>
深入理解 RxJava2:从 observeOn 到作用域(4)
查看>>
比CRUD多一点儿(一):MySQL常用命令
查看>>
CSS 属性篇(五):box-sizing属性
查看>>
作用域之let、var
查看>>
跟我一起学docker(八)--Dockerfile
查看>>
[译]理解react之setState
查看>>
Android 8.0 的部分坑及对应解决方法
查看>>
webpack集成开发环境wci-build
查看>>
从零讲解 iOS 中 OpenGL ES 的纹理渲染
查看>>
火狐浏览器61发布:UI、性能、TLS1.3以及更多你想了解的都在这儿
查看>>