rabbitmq-消息的可靠性投递

如何保证消息可靠性投递

从四个环节分析

  1. 生产者把消息发送到broker,怎么知道自己的消息有没有被broke接收?
  2. 消息从交换机路由到queue,如果消息没有办法路由到指定的queue,会发生什么?该怎么处理?
  3. 消息在queue中存储,怎么保证消息在队列中稳定的存储?
  4. 消费者消费消息,消费完queue要删除这条消息。borker怎么知道消费者已经接收消息了呢?

1. 生产者发送消息

RabbitMQ提供了两种服务端确认机制,也就是生产者投递消息给服务端之后,服务端会通过某种方式返回一个应答,只要生产者收到这个应答,就知道消息成功发送了

一种是事务模式(Transaction),一种是确认模式(Confirm)

  1. 事务模式

获取channel之后,调用channel.txSelect()方法将信道设置成事务模式

1
2
3
4
5
6
7
8
9
10
11
try{
channel.txSelect();
// 发送消息
// 参数:String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
// int i =1/0;
channel.txCommit();
}catch(Exeception e){
//回滚
channel.txRollback()
}

如果channel.txCommit()方法能正常返回,说明事务提交成功,消息一定到达了RabbitMQ

如果事务提交方法执行之前由于RabbitMQ异常崩溃或者其他原因排除异常,这时我们可以将其捕获,执行回滚操作。

在事务模式中,只有收到了服务端的Commit-OK指令才算提交成功。

事务模式的缺点:

整个过程是阻塞的,意味着一条消息没有确认提交就不能发送下一条消息,所以不建议在生产环境下使用。

事务模式在spring-boot中设置:

1
rabbitTemplate.setChannelTransacted(true);
  1. 确认模式

Confirm模式有三种,一种是普通确认模式

生产者通过调用channel.confirmSelect()方法将信道设置为 Confirm 模式,一旦消息被投递到的所有匹配的队列之后,RMQ就会发送一个确认(Basic.Ack)给生产者,也就是调用channel.waitForConfirms()返回 true,这是生产者就知道服务端已接收消息。

1
2
3
4
5
6
7
// 开启发送方确认模式
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 普通Confirm,发送一条,确认一条
if (channel.waitForConfirms()) {
System.out.println("消息发送成功" );
}

这种发送一条确认一条的的方式性能还是不高,所以有了批量确认模式

批量确认笔普通单条确认效率要高,但是也存在两个问题:

  • 批处理消息数量太小对性能提升有限
  • 太大的话如果第1000条消息被拒绝,那么前面的999条都要重发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
// 发送消息
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
}
// 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
// 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
// 直到所有信息都发布,只要有一个未被Broker确认就会IOException
channel.waitForConfirmsOrDie();
System.out.println("消息发送完毕,批量确认成功");
} catch (Exception e) {
// 发生异常,可能需要对所有消息进行重发
e.printStackTrace();
}

有没有一种方式可以一边发送一遍确认呢?

异步确认模式

异步确认需要添加一个ConfrimListener,并用一个SortedSet来维护没有被确认的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 用来维护未确认消息的deliveryTag
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
// 异步监听确认和未确认的消息
// 如果要重复运行,先停掉之前的生产者,清空队列
channel.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Broker未确认消息,标识:" + deliveryTag);
if (multiple) {
// headSet表示后面参数之前的所有元素,全部删除
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
// 这里添加重发的方法
}
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,
// 如果为false的话表示单条确认
System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
if (multiple) {
// headSet表示后面参数之前的所有元素,全部删除
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
// 只移除一个元素
confirmSet.remove(deliveryTag);
}
System.out.println("未确认的消息:"+confirmSet);
}
});

// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
long nextSeqNo = channel.getNextPublishSeqNo();
// 发送消息
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
confirmSet.add(nextSeqNo);
}

spring-boot中因为 RabbitTemplate 对 Channel 进行了封装,所以要使用RabbitTemplate封装的回调对象ConfimrCallback。

1
2
3
4
5
6
7
8
9
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { 
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.out.println("发送消息失败:" + cause);
throw new RuntimeException("发送异常:" + cause);
}
}
});

2. 交换机路由消息

再什么情况下消息会无法路由到正确的queue?

路由键错误 或者 queue不存在

两种方式出路无法路由的消息,一种是让服务端重发给生产者,一种是让交换机路由到另一个备份的交换机。

消息回发的方式:使用mandatory参数和ReturnListener(在spring amqp中叫ReturnCallback)

1
2
3
4
5
6
7
8
9
10
rabbitTemplate.setMandatory(true); 
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey{
System.out.println("回发的消息:");
System.out.println("replyCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey);
}});

消息路由到备份交换机:在创建交换机的时候,指定备份交换机

1
2
3
Map<String,Object> arguments = new HashMap<String,Object>(); 
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交换机的备份交换机
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);

注意与死信交换机区别,死信交换机是由queue指定的,备份交换机是由交换机指定的

3. 消息的存储

消息没有被消费的话会一直存储在队列中。

如果RabbitMQ服务发生硬件故障:宕机重启关闭等等,会导致内存中的消息丢失,所以我们要把消息本省和元数据都保存到磁盘。

注意:元数据指的是交换机、队列、绑定

解决方案:

1、队列持久化

声明队列的时候,指定参数durable=true

2、交换机持久化

声明交换机的时候,指定参数durable=true

3、消息持久化

指定消息属性DeliveryMode

1
2
3
MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
Message message = new Message("msg content".getBytes(), messageProperties);
template.send("exchange name","route key",message);

4、 集群

以上三点做了持久化,但还是存在单点故障风险,因此需要mq集群。

4. 投递消息

消费者拿到消息后没处理就发生异常或者处理过程中发生异常,会导致消费失败。服务端应该使用某种方式的值消费者对消息的消费情况,并决定是否重新投递消息。

RMQ提供了消费者的消息确认机制,消费者可以自动或者手动的发送ACK给服务端。

没有收到ACK的消息,消费者断开连接后,RMQ会把这条消息发送给其他消费者,如果没有其他消费者,消费者重启后会重新消费者条消息。

消费者在订阅是可以指定autoACK参数,当该参数为false时,RMQ会等待消费者显示回复确认信号后在从队列中移除消息。

如何设置手动确认?

1
spring.rabbitmq.listener.simple.acknowledge-mode=manual

该配置有三个值:

  • none–自动ack
  • manual–手动ack
  • auto–方法为抛出异常,发送ack

当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝, 且不重新入队。当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会发送 ACK。其他的异常,则消息会被拒绝,且 requeue

思考:服务端收到了 ACK 或者 NACK,生产者会知道吗?

即使消费者没有接收到消 息,或者消费时出现异常,生产者也是完全不知情的。

例如,我们寄出去一个快递,是怎么知道收件人有没有收到的?因为有物流跟踪和 签收反馈,所以寄件人可以知道。

在没有用上电话的年代,我们寄出去一封信,是怎么知道收信人有没有收到信件? 只有收到回信,才知道寄出的信被收到了。 所以,这个是生产者最终确定消费者有没有消费成功的两种方式:

1) 消费者收到消息,处理完毕后,调用生产者的 API(思考:是否破坏解耦?)

2) 消费者收到消息,处理完毕后,发送一条响应消息给生产者

RabbitMq没有提供上述的回调机制,若业务需求,可以自己实现。

消费者回调

1) 调用生产者 API

例如:提单系统给其他系统发送了碎屏保消息后,其他系统必须在处理完消息 后调用提单系统提供的 API,来修改提单系统中数据的状态。只要 API 没有被调用, 数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

2) 发送响应消息给生产者

例如:商业银行与人民银行二代支付通信,无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。

补偿机制

如果生产者的 API 就是没有被调用,也没有收到消费者的响应消息,怎么办?

不要着急,可能是消费者处理时间太长或者网络超时。

生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有 得到响应的消息,可以设置一个定时重发的机制,但要发送间隔和控制次数,比如每隔 2 分钟发送一次,最多重发 3 次,否则会造成消息堆积。

重发可以通过消息落库+定时任务来实现。

重发,是否发送一模一样的消息?

消息幂等性

如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用 API 的时候出了问题,会不会出现消息的重复处理?例如:存款 100 元,ATM 重发了 5 次,核心系统一共处理了 6 次,余额增加了 600 元。

为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ 服务端是没有这种控制的(同一批的消息有个递增的 DeliveryTag),它不知道你是不是就要把 一条消息发送两次,只能在消费端控制。

如何避免消息的重复消费?

消息出现重复可能会有两个原因:

1、生产者的问题,环节①重复发送消息,比如在开启了 Confirm 模式但未收到确认,生产者重复投递。

2、环节④出了问题,由于消费者未发送 ACK 或者其他原因,消息重复投递。

3、生产者代码或者网络问题。

对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。

消息的顺序性

消息的顺序性指的是消费者消费消息的顺序跟生产者生产消息的顺序是一致的。

例如:商户信息同步到其他系统,有三个业务操作:1、新增门店 2、绑定产品 3、 激活门店,这种情况下消息消费顺序不能颠倒(门店不存在时无法绑定产品和激活)。

又比如:1、发表微博;2、发表评论;3、删除微博。顺序不能颠倒。

在 RabbitMQ 中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是 不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用的队列)。

除非负载的场景,不要用多个消费者消费消息。