rabbitmq-消息收发流程

由于 RabbitMQ 实现了 AMQP 协议,所以 RabbitMQ 的工作模型也是基于 AMQP 的,理解这张图片至关重要。

rabbit-1

rabbit的概念

Broker–主机节点,中文翻译是代理/中介,因为 MQ 服务器帮助我们做的事 情就是存储、转发消息。

Connection–无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个连接,这个连接是一个 TCP 的长连接。

Channel–如果所有的生产者发送消息和消费者接收消息,都直接创建和释放 TCP 长连接的话, 对于 Broker 来说肯定会造成很大的性能损耗,因为 TCP 连接是非常宝贵的资源,创建和 释放也要消耗时间。

所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接。我们把它翻译 成通道,或者消息信道。这样我们就可以在保持的 TCP 长连接里面去创建和释放 Channel,大大了减少了资源消耗。另外一个需要注意的是,Channel 是 RabbitMQ 原 生 API 里面的最重要的编程接口,也就是说我们定义交换机、队列、绑定关系,发送消 息消费消息,调用的都是 Channel 接口上的方法。

Queue–队列是真正用来存储消息的,是一个独立运行的进程,有自己的数据库(Mnesia)。

消费者获取消息有两种模式,一种是 Push 模式,只要生产者发到服务器,就马上推 送给消费者。另一种是 Pull 模式,消息存放在服务端,只有消费者主动获取才能拿到消 息。消费者需要写一个 while 循环不断地从队列获取消息吗?不需要,我们可以基于事件机制,实现消费者对队列的监听。

由于队列有 FIFO 的特性,只有确定前一条消息被消费者接收之后,才会把这条消息 从数据库删除,继续投递下一条消息。

Exchange–交换机是一个绑定列表,用来查找匹配的绑定关系。

队列使用绑定键(Binding Key)跟交换机建立绑定关系。 生产者发送的消息需要携带路由键(Routing Key),交换机收到消息时会根据它保存的绑定列表,决定将消息路由到哪些与它绑定的队列上。

注意:交换机与队列、队列与消费者都是多对多的关系。

vhost–我们每个需要实现基于 RabbitMQ 的异步通信的系统,都需要在服务器上创建自己 要用到的交换机、队列和它们的绑定关系。如果某个业务系统不想跟别人混用一个系统, 怎么办?再采购一台硬件服务器单独安装一个 RabbitMQ 服务?这种方式成本太高了。 在同一个硬件服务器上安装多个 RabbitMQ 的服务呢?比如再运行一个 5673 的端口? 没有必要,因为 RabbitMQ 提供了虚拟主机 VHOST。

VHOST 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。它的作用类似于编程语言中的 namespace 和 package,不同的 VHOST 中可以有 同名的 Exchange 和 Queue,它们是完全透明的。

我们可以为不同的业务系统创建不同的用户(User),然后给这些用户 分配 VHOST 的权限。比如给风控系统的用户分配风控系统的 VHOST 的权限,这个用户可以访问里面的交换机和队列。给超级管理员分配所VHOST 的权限。

queue的绑定

将queue绑定到exchange上时,会指定一个BindingKey

生产者消息发送到exchange,会携带一个rountKey

exchange将根据消息的rountKey路由到已绑定的且BindingKey匹配的queue上

三种常用的exchange

1、Direct exchange:交换机路由消息,要精确匹配rountKey,即消息的rountKey与绑定queue的rountKey要完全一致

2、Topic exchange:queue的rountKey中含有通配符,交换机路由消息,只要消息的rountKey能匹配topic patten就能路由

​ “#” 表示0个或多个word

​ ”*“ 表示不多不少一个word

如果 queue指定的rountKey是 : #.panda.*,则消息的rountKey为 panda.txt / zhao.panda.name / zhao.test.panda.zzk 这些都能路由到该queue中

3、Fanout exchange:扇形交换机,queue与其绑定时不需要指定rountKey,生产者向该类exchange发送消息时也不用携带rountKey,该类交换机会将收到的消息广播给所有的与其绑定的queue。

java-api编程

创建 Maven 工程,pom.xml 引入依赖

1
2
3
4
5
<dependency> 
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.0.12.74");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connection connection = factory.newConnection();
// 创建消息通道
Channel channel = connection.createChannel();
String msg = "Hello Rabbit Mq!";
// 向交换机投递消息,携带路由键,没有额外参数
channel.basicPublish(EXCHANGE_NAME,"panda.test",null,msg.getBytes());
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
private final static String QUEUE_NAME = "SIMPLE_QUEUE";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.0.12.74");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct" ,true); //声明交换机
channel.queueDeclare(QUEUE_NAME,false,false,false,null); //声明队列
System.out.println("Waiting for msg...");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"panda.test");//绑定
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("message is: " + msg);
System.out.println("consumer tag: " + consumerTag);
System.out.println("delivery tag: " + envelope.getDeliveryTag());
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

启动消费者,再启动生产者

参数详解

1)声明交换机的参数

  • String type:交换机的类型,direct, topic, fanout 中的一种。

  • boolean durable:是否持久化,代表交换机在服务器重启后是否还存在。

2)声明队列的参数

  • boolean durable:是否持久化,代表队列在服务器重启后是否还存在。

  • boolean exclusive:是否排他性队列。排他性队列只能在声明它的 Connection 中使用(可以在同一个 Connection 的不同的 channel 中使用),连接断开时自动删 除。

  • boolean autoDelete:是否自动删除。如果为 true,所有与这个队列连接的消费者都断开时,队列会自动删除。

  • Map<String, Object> arguments:队列的其他属性

    | 属性 | 含义 |
    | ————————- | ———————————————- |
    | x-message-ttl | 队列中消息的存活时间,单位毫秒 |
    | x-expirs | 队列在多久没有消费者访问以后会被删除 |
    | x-max-length | 队列的最大消息数 |
    | x-max-length-bytes | 队列最大容量,单位字节 |
    | x-dead-letter-exchange | 队列指定的死信交换机 |
    | x-dead-letter-routing-key | 队列指定死信交换机的路由键 |
    | x-max-priority | 队列中消息的最大优先级,消息的优先级不能超过它 |

    3)消息属性 BasicProperties

    以下列举了一些主要的参数:

    | 参数 | 含义 |
    | ————————– | ——————————– |
    | Map<String,Object> headers | 消息的其他自然参数 |
    | Integer deliveryMode | 2持久化,其他:瞬态 |
    | Integer priority | 消息的优先级 |
    | String correlationId | 关联 ID,方便 RPC 相应与请求关联 |
    | String replyTo | 回调队列 |
    | String expiration | TTL,消息过期时间,单位毫秒 |

    注意:队列和消息同时指定了消息过期时间,以时间短的为准