RabbitMQ基础
一、MQ基础
1.1、同步调用的优缺点
看到一个简单的业务实现:用户支付后,需要查询订单信息然后调用仓储服务…等一系列服务
- 耦合度高:每次加入新的需求,都需要修改原来的代码。
- 性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
- 资源浪费:调用链中的每个服务在等待响应过程中不能释放请求占用的资源,高并发场景下极度浪费资源。
- 级联失败:如果服务提供者出现问题,所有调用方都会跟着出现问题。
优点:
同步调用虽然有以上的问题,但是相比于异步调用,同步服务的响应更迅速
1.2、异步调用的优缺点
- 耦合度降低:即增加新服务的时候只需要告知Broker【中间商】添加订阅事件即可
- 吞吐量提升:同步调用时,需要等待所有服务执行完用户才能得到响应,而异步调用用户向Broker发送请求后,无需等待全部服务的完成即可得到响应结果。
- 故障隔离:服务之间没有强依赖关系,不担心级联失败问题。
- 流量削峰:高并发请求通过Broker缓存,微服务基于服务能力从Broker中获取事件,处理事件,起到对微服务的保护作用
缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂,业务没有明显的流程线,不便于追踪管理
1.3、什么是MQ
MQ(MessageQueue)消息队列,也就是异步调用中的Broker
RabbitMQ | RockerMQ | Kafka | ActiveMQ | |
---|---|---|---|---|
公司/社区 | Rabbit | 阿里 | Apache | Apache |
开发语言 | Erlang | Java | Scala&Java | Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | 自定义协议 | 自定义协议 | OpenWire、STOMP、REST、XMPP、AMQP |
可用性 | 高 | 高 | 高 | 一般 |
单击吞吐量 | 一般 | 高 | 非常高 | 差 |
消息延迟 | 微妙级 | 毫秒级 | 毫秒以内 | 毫秒级 |
消息可靠性 | 高 | 高 | 一般 | 一般 |
二、RabbitMQ快速入门
2.1、RabbitMQ部署【下载安装运行】
当前ubuntu系统下使用docker下载镜像文件并运行MQ容器
运行docker
1
systemctl start docker
拉取MQ资源
1
docker pull rabbitmq:3-management
执行以下命令配置并运行MQ容器
1
2
3
4
5
6
7
8
9docker run \
-e RABBITMQ_DEFAULT_USER=root \ #此处设置用户名
-e RABBITMQ_DEFAULT_PASS=123456 \ #设置密码
--name mq \ #创建的容器名字
--hostname mq1 \ #设置端口名
-p 15672:15672 \ #设置管理端的端口
-p 5672:5672 \ #设置用户端的端口
-d \ #-d参数表示docker后台运行该容器
rabbitmq:3-management上述配置成功实现后,我们打开RabbitMQ管理端页面
2.2、RabbitMQ结构与概念
打开MQ管理页面可以看到如下画面
页面中包含RabbitMQ的几个概念:
- Channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
RabbitMQ的结构
2.3、RabbitMQ消息模型介绍
MQ的官方文档给出了 6 个MQ的Demo示例 ,下面给出一些常用的用法:
- 基本消息队列(BasicQueue)
- 工作消息队列(WorkQueue)
- 发布订阅(publish Subscribe),根据交换机类型不同分为三种:
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
2.4、Helloworld案例
官方的Helloworld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue中
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列中的信息
打开idea创建两个模块,一个模块用来的当做消息发布者【publisher】,另一个模块用来当做消息接受者【comsumer】
并在这两个模块中添加RabbitMQ的依赖
1 | <!--AMQP依赖,包含RabbitMQ--> |
然后在模块消息发布者中书写代码如下:
1 | public class PublisherTest { |
接收模块书写接收消息队列的代码
1 | public class ConsumerTest { |
根据上述的Helloworld案例,可以知道基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送信息
基本消息对立接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
三、SpringAMQP
- 根据上文Helloworld案例使用官方的API实现的简单队列模型,可以发现使用官方的API操作十分麻烦,因此学习SpringAMQP,可以简化消息发送和接收的API
AMQP:Advanced Message Queuing Protocol ,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求
SpringAMQP:是基于AMQP协议的一套API规范,提供了模板来发送和接收信息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
3.1、案例:利用Spring-AMQP实现基础消息队列
3.1.1、消息的发送
在工程中添加
spring-amqp
的依赖1
2
3
4
5<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>在发布信息的模块中使用
RabbitTemplate
发送信息到simple.queue
这个队列为了解除硬编码问题,首先编写配置文件
application.yml
添加MQ的连接信息1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.91.134 # 主机名
password: 123456 # 密码
username: root # 用户名
port: 5672 # 端口
virtual-host: / # 虚拟主机-
1
2
3
4
5
6
7
8
9
10
11
12
public class SpringAMQPTest {
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
String queueName = "simple.queue";
String messgae = "这是传递的信息";【
rabbitTemplate.convertAndSend(queueName,messgae);
}
} 启动程序并打开RabbitMQ管理端
http://192.168.91.134:15672
查看信息是否传递到队列打开管理端查看队列可以看到确实缓存了一条信息
注意事项:
操作工具类rabbitTemplate发送信息到队列的时候,需要先确保该队列已经存在,否则信息无法发送到队列,并且运行的时候也不会报错。
声明队列的两种方法:
在管理端图形界面手动创建队列
使用代码声明队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
String queueName = "sb.queue";
String messgae = "1231231";
//声明队列
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate);
Queue simpleQueue = new Queue(queueName);
admin.declareQueue(simpleQueue);
//声明队列后向队列发送信息
rabbitTemplate.convertAndSend(queueName,messgae);
}此时打开管理端可以看到确实创建了一个新队列
sb.queue
其实综上所述,SpringAMQP发送信息无非以下几点
- 引入amqp的starter依赖
- 配置RabbitMQ的地址
- 利用RabbitTemplate的
convertAndSend
方法
3.1.2、消息的接收
在工程中添加
spring-amqp
的依赖【与上文发送消息步骤一致】1
2
3
4
5<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>创建
application.yml
配置RabbitMQ的地址【与上文发送信息的步骤一致】1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.91.134 # 主机名
password: 123456 # 密码
username: root # 用户名
port: 5672 # 端口
virtual-host: / # 虚拟主机在接收信息的服务中新建一个类,编写处理信息的逻辑
1
2
3
4
5
6
7//将该类声明为一个bean让spring能够发现
public class Listener {
//该注解用来声明监听队列的名称
public void workQueue(String msg) throws InterruptedException {
System.out.println("spring消费者接收到信息: " + msg + "时间: " +LocalTime.now());
}
}运行并查看成功接收到- 上文发送的消息
打开管理端查看队列,可以看到此时队列的信息被取走消息不存在了
综上所述,SpringAMQP接收消息为以下步骤:
- 引入amqp的starter依赖
- 配置RabbitMQ地址
- 定义类,添加
@Component
注解,交给Spring管理 - 类中定义方法,添加
@RabbitListener
注解,方法参数就是接收的信息类型
3.2、WorkQueue模型
基础消息队列:一个生产者对应一个消费者
工作队列:一个生产者对应多个消费者,多个消费者绑定到同一个队列,同一条消息只会被一个消费者处理
- 提高消息的处理速度,避免队列消息堆积
3.2.1、模拟workqueue
- 实现一个队列绑定多个消费者
基本思路如下:
- 在publisher服务中定义测试方法,每秒产生50条信息,发送到
simple.queue
- 在consumer服务中定义两个消息监听者,都监听
simple.queue
队列 - 消费者(1)每秒处理50条信息,消费者(2)每秒处理10条信息
案例实现:
首先定义生产者,每秒发送五十条信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SpringAMQPTest {
private RabbitTemplate rabbitTemplate;
public void sendToWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "工作队列测试信息__";
for (int i = 1; i <= 50; i++){
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
}然后定义两个消费者监听相同的队列
simple.queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Listener {
public void workQueue_1(String msg) throws InterruptedException {
System.out.println("消费者(1)接收: 【" + msg + "】时间: " +LocalTime.now());
Thread.sleep(20); //此处调节线程睡眠时间的不同,从而实现控制处理性能的高低
}
public void workQueue_2(String msg) throws InterruptedException {
System.err.println("消费者(2)接收: 【" + msg + "】时间: " +LocalTime.now());
Thread.sleep(200); //此处调节线程睡眠时间的不同,从而实现控制处理性能的高低
}
}- 启动生产者服务,向队列发送五十条信息,然后启动消费者服务可以看到消息平均分配给两个消费者
平均分配是为什么呢?
- 因为没有配置RabbitMQ的消息预取机制,因此每个消费者取信息的能力是无限的,因此上述案例中的两个消费者平均取走了消息,即使二者就算存在了处理性能的差异,因为取走信息的数量是相同的,因此最后处理的数据还是等量的。
如果消费者都是平均分配信息处理,那就没有考虑到每个消费者的性能。
- 我们希望处理性能高的消费者处理多点信息,处理性能低的消费者少处理点信息,那么总的处理时间就可以大大降低了。
因此需要配置消息预取的数量,这样便可以控制处理越快的消费者获取的消息就越多
1 | spring: |
然后我们重启生产者和消费者服务,再次查看处理信息的情况
3.3、发布订阅模型
前面介绍的案例都是只能实现一条消息一个消费者接收并使用
而发布订阅模型与它们的区别就是允许将同一条消息发送给多个消费者。实现方式是加入了Exchange(交换机)
常见的交换机类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
注意:exchange只负责消息路由,不是存储,路由失败则消息丢失
3.3.1、Fanout_Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
利用SpringAMQP演示Fanoutexchange的使用
实现思路如下:
- 在消费者服务中,利用代码声明队列、交换机、并将二者绑定
- 在消费者服务中,编写两个消费者方法,分别监听fanout.queue(1)和fanout.queue(2)
- 在生产者中编写测试方法,向itcast.fanout发送信息
步骤一:在consumer服务使用bean声明Exchange、Queue、Binding
在Consumer服务常见的一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
1 |
|
启动服务后,打开MQ管理端查看是否成功绑定了交换机与队列的关系
步骤二:队列与交换机绑定关系成功后,在consumer服务中定义监听函数,监听两个队列
步骤三:并在publisher中发送信息到交换机
启动服务可以看到两个队列都收到了消息:可知交换机将一条消息发给了绑定关系的两个队列
综上所述:交换机的作用是什么?
- 接收发布的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,信息丢失
- FanoutExchange会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
Queue
FanoutExchange
Binding
3.3.2、Direct_Exchange
Direct Exchange会将接收到的信息根据规则路由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
利用SpringAMQP演示DirectExchange的使用
实现思路如下:
利用注解
@RabbitListener
声明Exchange、Queue、RoutingKey【原来的使用bean来配置Exchange、Queue、RoutingKey太复杂,因此直接在注解中声明即可】-点击查看使用bean配置的例子在consumer服务中,编写两个消费者方法,分别监听
direct.queue1
和direct.queue2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void listenDirect_1(String msg){
System.out.println("接收到direct.queue_1的信息:【" + msg + "】");
}
public void listenDirect_2(String msg){
System.out.println("接收到direct.queue_2的信息:【" + msg + "】");
}启动服务查看交换机的配置详情
在publisher中编写测试方法,向
mystudy.direct
发送信息,启动服务观察结果因此可以验证:Direct_Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
Direct交换机和Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪一个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于注解@RabbitListener注解声明队列和交换机有哪些常见注解
- @Queue
- @Exchange
3.3.3、Topic_Exchange
TopicExchange和DirectExchange类似,区别在于routintKey必须是多个单词的列表,并且以.
分割
Queue与Exchange指定BindingKey时可以使用通配符:
#
:指代0个或多个单词
*
:指代一个单词
利用SpringAMQP演示TopicExchange的使用
实现思路如下:
利用
@RabbitListener
声明Exchange、Queue、RoutingKey在Consumer服务中,编写两个消费者方法,分别监听
topic.queue1
和topic.queue2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void listenTopic_1(String msg){
System.out.println("接收到topic.queue_1的信息:【" + msg + "】");
}
public void listenTopic_2(String msg){
System.out.println("接收到topic.queue_2的信息:【" + msg + "】");
}在publisher中编写测试方法,向
mystudy.topic
发送信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void SendTo_China_News(){
String exchangeName = "mystudy.topic";
String msg = "交换机topic测试信息__{China and News}__";
String routingKey = "china.news";
rabbitTemplate.convertAndSend(exchangeName,routingKey,msg);
}
public void SendTo_News(){
String exchangeName = "mystudy.topic";
String msg = "交换机topic测试信息__{China and News}__";
String routingKey = "test.news";
rabbitTemplate.convertAndSend(exchangeName,routingKey,msg);
}
3.4、消息转换器
说明:在SpringAMQP的发送方法中,接收信息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
验证:查看RabbitTempate的convertAndSend
函数接收的参数
查看原来的消息转换器接收对象之后的处理结果,然后在使用新的转换器
第一步:定义一个新的队列用来接收对象
1 |
|
第二步:编写发送信息的函数
1 |
|
第三步:打开管理端查看存储的内容可以看到存储的内容是Java序列化的对象
Spring的对消息的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOupytStream完成序列化。
如果要修改只需要定义一个MessageConverter类型的bean即可。推荐使用JSON方式序列化,步骤如下:
首先在publisher服务中引入依赖
1
2
3
4
5<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.13.3</version>
</dependency>在publisher服务声明MessageConverter
1
2
3
4
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}上述配置完成后重新发送对象信息,再次打开管理端查看存储的内容:
上述就已经完成了对发送消息时候的格式转换,接收消息的时候同样需要进行消息的转换,因此引入依赖,配置转换器两部的操作是一致的。
首先在consumer服务中引入依赖
1
2
3
4
5<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.13.3</version>
</dependency>在consumer服务声明MessageConverter
1
2
3
4
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
综上所述:SpringAMQP中消息的序列化和反序列化是如何实现的?
- 利用MessageConverter实现的,默认是
JDK的序列化
- 注意发送方与接收方必须使用相同的
MessageConverter
- 标题: RabbitMQ基础
- 作者: 忘记中二的少年
- 创建于 : 2023-10-22 20:23:00
- 更新于 : 2023-10-22 20:23:43
- 链接: https://github.com/HandsomeXianc/HandsomeXianc.github.io/2023/10/22/MQ入门/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。