rocketmq-基本原理分析

阿里巴巴在2012年开源,2017年成为apache顶级项目

核心设计几件了kafka,所以我们在了解rockeetMq时会发现很多和kafka相同的特性。同时,它在某些功能上氪kafka又有较大的差异。

rocketMQ 特性:

  • 支持集群模型,负载均衡、水平扩展
  • 亿级别消息堆积能力
  • 采用零拷贝原理。顺序写磁盘。随机读
  • 底层通信使用netty
  • 使用nameservice代替zookeeper 实现服务寻址和服务协调
  • 消息失败重试机制,消息可查询
  • 强调集群无单点。可扩展,任意一点高可用,
  • 经过多次双十一的考验

1. 架构介绍

集群本身没有什么特殊之处,和kafka整体架构类似,使用NameService代理Zookeeper。在rocketmq的早版本(2.x)的时候,是没有namesrv组件的,用的是zookeeper做分布式协调 和服务发现,但是后期阿里数据根据实际业务需求进行改进和优化,自组研发了轻量级的 namesrv,用于注册Client服务与Broker的请求路由工作,namesrv上不做任何消息的位置存储, 频繁操作zookeeper的位置存储数据会影响整体集群性能。

Rocket由四部分组成:

  1. Name Server 可集群部署,节点之间无任何信息同步。提供轻量级的服务发现和路由
  2. Broker(消息中转角色,负责存储消息,转发消息) 部署相对复杂,Broker 分为Master 与Slave,一 个Master 可以对应多个Slave,但是一个Slave 只能对应一个Master,Master 与Slave 的对应关系通过 指定相同的BrokerName,不同的BrokerId来定 义,BrokerId为0 表示Master,非0 表示Slave。 Master 也可以部署多个。
  3. Producer,生产者,拥有相同 Producer Group 的 Producer 组成一个集群, 与Name Server 集群 中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic 路由信息,并向提供Topic 服务的Master 建立长连接,且定时向Master 发送心跳。Producer 完全无状态,可集群部署。
  4. Consumer,消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的 Consumer 组成 一个集群,与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取 Topic 路由信息,并向提供Topic 服务的Master、Slave 建立长连接,且定时向Master、Slave 发送心 跳。Consumer既可以从Master 订阅消息,也可以从Slave 订阅消息,订阅规则由Broker 配置决定。

要使用rocketmq,至少需要启动两个进程,nameserver、broker,前者是各种topic注册中心,后者 是真正的broker。

2. 单机部署

  • 下载压缩包并解压:
1
unzip rocketmq-all-4.7.0-bin-release.zip
  • 启动namesrv

进入bin目录

1
nohup sh mqnamesrv &

默认情况下,nameserver监听的是9876端口。

使用下面命令查看启动日志:

1
tail -f ~/logs/rocketmqlogs/namesrv.log
  • 启动broker

修改conf/broker.conf文件,插入配置:brokerIP1=10.0.12.76(公网ip) ,如此设置才能实现跨域请求

使用以下命令启动broker:

1
2
nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /conf/broker.conf &
# namesrvIp 为当前节点的公网ip
  • 内存不足问题

这是因为bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内 存太大,rocketmq比较耗内存,所以默认分配的内存比较大,而系统实际内存却太小导致启动失败, 通常像虚拟机上安装的 CentOS 服务器内存可能是没有高的,只能调小。实际中应该根据服务器内存情 况,配置一个合适的值

修改runbroker.sh和runbroker.sh

  • 停止服务

停止服务的时候需要注意,要先停止broker,其次停止nameserver。

1
2
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
  • broker.conf文件

默认情况下,启动broker会加载conf/broker.conf文件,这个文件里面就是一些常规的配置信息

1
2
3
4
5
namesrvAddr //nameserver地址
brokerCl usterName //Cluster名称,如果集群机器数比较多,可以分成多个cluster,每个cluster提供 给不同的业务场景使用
brokerName //broker名称,如果配置主从模式,master和slave需要配置相同的名称来表名关系 brokerId=0 //在主从模式中,一个master broker可以有多个slave,0表示master,大于0表示不同slave的id
brokerRole=SYNC_MASTER/ASYNC_MASTER/SLAVE //同步表示slave和master消息同步完成后再返回信息给客户端
autoCreateTopicEnable = true // topic不存在的情况下自动创建

3. 二主二从异步集群部署

第一台机器 10.0.12.74
端口规划:
9876 NameServer1
10910 BrokerA-master
10921 BrokerB-slave

第二台机器 10.0.12.76
端口规划:
9876 NameServer2
10920 BrokerB-master
10911 BrokerA-slave

架构

3.1 下载

从官网首页最新发布版本进入下载地址
http://rocketmq.apache.org/

比如:

1
2
cd /usr/local/app
wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
3.2 解压

解压后,把文件夹改个名字

1
2
unzip rocketmq-all-4.7.1-bin-release.zip
mv rocketmq-all-4.7.1-bin-release rocketmq

在两台机器上都下载、解压好。
在rocketmq/conf目录下,有三种建议配置模式:
2m-2s-async(2主2从异步) —— 本文采用这种
2m-2s-sync (2主2从同步)
2m-noslave (2主)

现在需要修改两台机器上2m-2s-async这个目录中的文件。
配置文件修改之前先备份。

3.3 配置第一台机器

10.0.12.74的两个配置文件

(1)broker-a.properties

1
2
cd /usr/local/app/rocketmq/conf/2m-2s-async
vim broker-a.properties

修改的内容(名字自定义,保持一致,否则不能组成集群)

1
brokerClusterName=qingshan-cluster

增加的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Broker 对外服务的监听端口
listenPort=10910
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#nameServer地址,分号分割
namesrvAddr=10.0.12.74:9876;10.0.12.76:9876
#存储路径
storePathRootDir=/usr/local/app/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/app/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/app/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store/abort

(2)broker-b-s.properties

1
vim  broker-b-s.properties

修改的内容(名字自定义,保持一致,否则不能组成集群)

1
brokerClusterName=qingshan-cluster

增加的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Broker 对外服务的监听端口
listenPort=10921
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#nameServer地址,分号分割
namesrvAddr=10.0.12.74:9876;10.0.12.76:9876
#存储路径
storePathRootDir=/usr/local/app/rocketmq/store/broker-b-s
#commitLog 存储路径
storePathCommitLog=/usr/local/app/rocketmq/store/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/app/rocketmq/store/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store/abort
3.4 配置第二台机器

10.0.12.76的两个配置文件
修改的内容基本一致,主要是注意一下端口号、路径名。

(1)broker-b.properties

1
2
cd /usr/local/app/rocketmq/conf/2m-2s-async
vim broker-b.properties

修改的内容(名字自定义,保持一致,否则不能组成集群)

1
brokerClusterName=qingshan-cluster

增加的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Broker 对外服务的监听端口
listenPort=10920
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#nameServer地址,分号分割
namesrvAddr=10.0.12.74:9876;10.0.12.76:9876
#存储路径
storePathRootDir=/usr/local/app/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/app/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/app/rocketmq/store/broker-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store/abort

(2)broker-a-s.properties

1
2
cd /usr/local/app/rocketmq/conf/2m-2s-async
vim broker-a-s.properties

修改的内容(名字自定义,保持一致,否则不能组成集群)

1
brokerClusterName=qingshan-cluster

增加的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Broker 对外服务的监听端口
listenPort=10911
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#nameServer地址,分号分割
namesrvAddr=10.0.12.74:9876;10.0.12.76:9876
#存储路径
storePathRootDir=/usr/local/app/rocketmq/store/broker-a-s
#commitLog 存储路径
storePathCommitLog=/usr/local/app/rocketmq/store/broker-a-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/app/rocketmq/store/broker-a-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store/broker-a-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store/abort
3.5 创建数据目录

第一台机器163执行(只需要执行一次)

1
mkdir -p /usr/local/app/rocketmq/store/broker-a /usr/local/app/rocketmq/store/broker-a/consumequeue /usr/local/app/rocketmq/store/broker-a/commitlog /usr/local/app/rocketmq/store/broker-a/index /usr/local/app/rocketmq/logs /usr/local/app/rocketmq/store/broker-b-s /usr/local/app/rocketmq/store/broker-b-s/consumequeue /usr/local/app/rocketmq/store/broker-b-s/commitlog /usr/local/app/rocketmq/store/broker-b-s/index

第二台机器164执行(只需要执行一次)

1
mkdir -p /usr/local/app/rocketmq/store/broker-a-s /usr/local/app/rocketmq/store/broker-a-s/consumequeue /usr/local/app/rocketmq/store/broker-a-s/commitlog /usr/local/app/rocketmq/store/broker-a-s/index /usr/local/app/rocketmq/logs /usr/local/app/rocketmq/store/broker-b /usr/local/app/rocketmq/store/broker-b/consumequeue /usr/local/app/rocketmq/store/broker-b/commitlog /usr/local/app/rocketmq/store/broker-b/index
3.6 启动两个NameServer

启动第一台机器163的NameServer

1
nohup sh /usr/local/app/rocketmq/bin/mqnamesrv >/usr/local/app/rocketmq/logs/mqnamesrv.log 2>&1 &

启动第二台机器164的NameServer

1
nohup sh /usr/local/app/rocketmq/bin/mqnamesrv >/usr/local/app/rocketmq/logs/mqnamesrv.log 2>&1 &
3.7 启动Broker

1、启动节点1 163的 broker-a-master
在163上面执行

1
nohup sh /usr/local/app/rocketmq/bin/mqbroker -c /usr/local/app/rocketmq/conf/2m-2s-async/broker-a.properties > /usr/local/app/rocketmq/logs/broker-a.log 2>&1 &

在虚拟机中可能由于内存不够导致无法启动,日志文件中出现如下错误:

1
2
3
nohup: ignoring input
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
vim /usr/local/app/rocketmq/bin/runbroker.sh

把8g和4g改成512m和256m

1
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

再次启动。

2、启动节点2 164的broker-a-s
在164上面执行

1
nohup sh /usr/local/app/rocketmq/bin/mqbroker -c /usr/local/app/rocketmq/conf/2m-2s-async/broker-a-s.properties > /usr/local/app/rocketmq/logs/broker-a-s.log 2>&1 &

3、启动节点2 164的 broker-b-master
在164上面执行

1
nohup sh /usr/local/app/rocketmq/bin/mqbroker -c /usr/local/app/rocketmq/conf/2m-2s-async/broker-b.properties > /usr/local/app/rocketmq/logs/broker-b.log 2>&1 &

4、启动节点1 163的broker-b-s
在163上面执行

1
nohup sh /usr/local/app/rocketmq/bin/mqbroker -c /usr/local/app/rocketmq/conf/2m-2s-async/broker-b-s.properties > /usr/local/app/rocketmq/logs/broker-b-s.log 2>&1 &

查看端口启动状态:
netstat -an|grep 109

4. 部署web-console

一、下载项目源代码

1
2
cd /usr/local/app
wget https://github.com/apache/rocketmq-externals/archive/master.zip

下载慢可以用复制链接到迅雷里面。或者从百度网盘下载:
链接:https://pan.baidu.com/s/1O4QnrltZvXgtSGwoqcjcMw
提取码:2673

解压:

1
unzip master.zip

解压出来的文件夹名字:
rocketmq-externals-master

二、修改配置文件

1
2
cd /usr/local/app/rocketmq-externals-master/rocketmq-console/src/main/resources/
vim application.properties

修改端口号:

1
server.port=7298

修改name server地址(多个地址用英文分号隔开)

1
rocketmq.config.namesrvAddr=10.0.12.76:9876

注意后面改了配置文件要重新打包

三、解压编译

1
2
cd /usr/local/app/rocketmq-externals-master/rocketmq-console/
mvn clean package -Dmaven.test.skip=true

四、启动jar包

1
2
cd target
java -jar rocketmq-console-ng-2.0.0.jar

五、访问
http://10.0.12.76:7298

六、日志
日志配置:

1
rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties

指定了logback.xml 为日志配置文件

1
<file>${user.home}/logs/consolelogs/rocketmq-console.log</file>

实际路径

1
cd ~/logs/consolelogs/rocketmq-console.log

5. 原生API调用

添加 jar包依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RocketMqProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("pd_producer_group");
producer.setNamesrvAddr("10.0.12.76:9876");
producer.setSendMsgTimeout(60000);
producer.start();
for(int i = 0; i < 20; i++){
try {
Message message = new Message("test-topic","panda",("Hello Rocketmq " + i)
.getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println(producer.send(message));

} catch (UnsupportedEncodingException | RemotingException | InterruptedException | MQBrokerException e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RocketMqConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pd_consumer_group");
consumer.setNamesrvAddr("10.0.12.076:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("test-topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt messageExt : msgs) {
if(messageExt.getReconsumeTimes() == 5){
// TODO 可以将对应的数据保存到数据库,以便人工干预
}
System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.%n");
}
}
5.1 RocketMQ消息支持的模式

NormalProducer

  • 消息同步发送

消息发送出去之后。procuder会等到broker回应后才能继续发送下一个消息

  • 消息异步发送

异步发送是指发送方在发出数据之后,不等接收方响应,接着发送下一个数据包的通信方式。MQ的异步发送需要用户实现异步调用接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,发送发通过毁掉接口接收服务器响应,并对相应结果进行处理

  • OneWay

单向发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发。只发送不管响应,效率最高

OrderProducer

前面我们学习kafka的时候有说到,消息可以通过自定义分区策略来实现消息的顺序发送,实现原理就是把同一类消息都发送到相同的分区上。

在RocketMQ中,是基于多个Message Queue来实现类似于kafka的分区效果。如果一个Topic 要发送 和接收的数据量非常大, 需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic 可以根据 需求设置一个或多个Message Queue。Topic 有了多个Message Queue 后,消息可以并行地向各个 Message Queue 发送,消费者也可以并行地从多个Message Queue 读取消息并消费。

要了解RocketMQ消息的顺序消费,还得对RocketMQ的整体架构有一定的了解

5.2 RocketMQ消息发送及消费的基本原理

这是一个比较宏观的部署架构图,rocketmq天然支持高可用,它可以支持多主多从的部署架构,这也 是和kafka最大的区别之一

RocketMQ中并没有master选举功能,所以通过配置多个master节点来保证rocketMQ的高可 用。和所有的集群角色定位一样,master节点负责接受事务请求、slave节点只负责接收读请求,并且接收master同步过来的数据和slave保持一致。当master挂了以后,如果当前rocketmq是一主多从, 就意味着无法接受发送端的消息,但是消费者仍然能够继续消费。所以配置多个主节点后,可以保证当其中一个master节点挂了,另外一个master节点仍然能够对外提 供消息发送服务。

当存在多个主节点时,一条消息只会发送到其中一个主节点,rocketmq对于多个master节点的消息发 送,会做负载均衡,使得消息可以平衡的发送到多个master节点上。一个消费者可以同时消费多个master节点上的消息,在下面这个架构图中,两个master节点恰好可以 平均分发到两个消费者上,如果此时只有一个消费者,那么这个消费者会消费两个master节点的数据。

由于每个master可以配置多个slave,所以如果其中一个master挂了,消息仍然可以被消费者从slave节 点消费到。可以完美的实现rocketmq消息的高可用

接下来,站在topic的角度来看看消息是如何分发和处理的,假设有两个master节点的集群,创建了一 个TestTopic,并且对这个topic创建了两个队列,也就是分区。

消费者定义了两个分组,分组的概念也是和kafka一样,通过分组可以实现消息的广播。

5.3 集群支持

RocketMQ天生对集群的支持非常友好

  1. 单Master

优点:除了配置简单没什么优点 缺点:不可靠,该机器重启或宕机,将导致整个服务不可用

  1. 多Master

优点:配置简单,性能最高

缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性

  1. 多Master多Slave,每个Master配一个Slave,有多对Master-Slave

集群采用异步复制方式,主备 有短暂消息延迟,毫秒级

优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预 缺点:Master宕机或磁盘损坏时会有少量消息丢失

  1. 多Master多Slave,每个Master配一个Slave,有多对Master-Slave

集群采用同步双写方式,主备 都写成功,向应用返回成功

优点:服务可用性与数据可用性非常高

缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主

需要注意的是,在RocketMQ里面,1台机器只能要么是Master,要么是Slave。这个在初始的机器配置 里面,就定死了。不会像kafka那样存在master动态选举的功能。其中Master的broker id = 0,Slave 的broker id > 0。

有点类似于mysql的主从概念,master挂了以后,slave仍然可以提供读服务,但是由于有多主的存 在,当一个master挂了以后,可以写到其他的master上。

5.4 消息的顺序消费

首先,需要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个 消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证 是有序的。

我们要做到生产者 - messagequeue - 消费者之间是一对一对一的关系。

通过自定义发送策略来实现消息只发送到同一个队列,保证消息的顺序存储

因为一个Topic 会有多个Message Queue ,如果使用Producer 的默认配置,这个Producer 会轮流向 各个Message Queue 发送消息。Consumer 在消费消息的时候,会根据负载均衡策略,消费被分配到 的Message Queue,如果不经过特定的设置,某条消息被发往哪个Message Queue ,被哪个Consumer 消费是未知的

如果业务需要我们把消息发送到指定的Message Queue 里,比如把同一类型的消息都发往相同的Message Queue。那是不是可以实现顺序消息的功能呢?

和kafka一样,rocketMQ也提供了消息路由的功能,我们可以自定义消息分发策略,可以实现 MessageQueueSelector,来实现自己的消息分发策略

1
2
3
4
5
6
7
8
Message message = new Message("test-topic","panda",("Hello Rocketmq " + i)
.getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println(producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0); //选择第1个队列
}
},"panda")); // "panda" 可以作为选择队列的参数,上面select方法的形参arg将接受这个值

如何保证消息消费顺序呢?

通过分区规则可以实现同类消息在rocketmq上的顺序存储。但是对于消费端来说,如何保证消费的顺序?

我们前面写的消息消费代码使用的是MessageListenerConcurrently并发监听,也就是基于多个线程并 行来消费消息。这个无法保证消息消费的顺序。

RocketMQ中提供了MessageListenerOrderly 类来实现顺序消费

1
2
3
4
5
6
7
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
msgs.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeOrderlyStatus.SUCCESS;
}
});

顺序消费会带来一些问题:

  1. 遇到消息失败的消息,无法跳过,当前队列消费暂停

  2. 降低了消息处理的性能

6. 消费端的负载均衡

和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区 的消息。

消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载

  • 消费端遍历自己的所有topic,依次调rebalanceByTopic
  • 根据topic获取此topic下的所有queue
  • 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)
  • 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法

什么时候触发负载均衡

  • 消费者启动之后

  • 消费者数量发生变更

  • 每10秒会触发检查一次rebalance

分配算法

RocketMQ提供了6中分区的分配算法:

  • (AllocateMessageQueueAveragely)平均分配算法(默认)
  • (AllocateMessageQueueAveragelyByCircle)环状分配消息队列
  • (AllocateMessageQueueByConfig)按照配置来分配队列: 根据用户指定的配置来进行负载
  • (AllocateMessageQueueByMachineRoom)按照指定机房来配置队列
  • (AllocateMachineRoomNearby)按照就近机房来配置队列:
  • (AllocateMessageQueueConsistentHash)一致性hash,根据消费者的cid进行

7. 消息的的可靠性原则

在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功, 可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?

7.1 消息消费端的确认机制

RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使 用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功

1
2
3
4
5
6
7
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

所有消费者在设置监听的时候会提供一个回调,业务实现消费回调的时候,当回调方法中返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是 消费完成的。如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批 消息消费失败了

7.2 消息的衰减重试

为了保证消息肯定至少被消费一次,RocketMQ会把这批消息重新发回到broker,在延迟的某个时间点 (默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续 失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预

可以修改broker-a.conf文件

1
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
7.3 重试消息的处理机制

一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数 达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试

1
2
3
4
5
6
7
8
9
10
11
12
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt messageExt : msgs) {
if(messageExt.getReconsumeTimes() == 5){
// TODO 可以将对应的数据保存到数据库,以便人工干预
}
System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});