2月8日常规学习记录:入门RabbitMQ教程之三
2.8日学习打卡
目录:
- 2.8日学习打卡
- 一.springboot整合RabbitMQ
- SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,
- SpringBoot整合RabbitMQ_编写生产者
- SpringBoot整合RabbitMQ_编写消费者
- Direct类型(默认,匹配发送)
- Topic类型(拓展匹配发送)
- Fanout 类型(广播发送)
- Headers(键值对匹配,不常用)
- Message(消息)
- 二. 消息的可靠性投递
- 确认模式(confirm)
- 退回模式(return)
- 消费者消息确认(Ack)
- 三.RabbitMQ高级特性
- 消费端限流
- 利用限流实现不公平分发
- 消息存活时间
- 优先级队列
一.springboot整合RabbitMQ
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用
SpringBoot整合RabbitMQ,简化代码编写
创建SpringBoot项目,引入RabbitMQ起步依赖
<!-- RabbitMQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starteramqp</artifactId>
</dependency>
编写配置文件
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: jjy
password: jjy
virtual-host: /
# 日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,
写法如下:
package com.jjy.springrabbitmqdemo;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
private final String EXCHANGE_NAME = "boot_topic_exchange";
private final String QUEUE_NAME = "boot_queue";
//创建交换机
@Bean("bootExchange")
public Exchange getExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)//交换机类型
.durable(true)//是否持久化
.build();
}
//创建队列
@Bean("bootQueue")
public Queue getMessageQueue() {
return new Queue(QUEUE_NAME); // 队列名
}
//交换机绑定队列
@Bean
public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("#.message.#")
.noargs();
}
}
SpringBoot整合RabbitMQ_编写生产者
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送
消息,编写生产者时只需要注入RabbitTemplate即可发送消息。
package com.jjy.springrabbitmqdemo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TestProducer {
//注入RabbitTemplate工具类
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送的消息
*/
public void testSendMessage(){
rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
}
}
SpringBoot整合RabbitMQ_编写消费者
消费者
package com.jjy.rabbitmqcosspring.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//消费者
@Component
public class Consumer {
//监听队列
@RabbitListener(queues = "boot_queue")
public void listenMessage(String message){
System.out.println("监听的消息: "+message);
}
}
整合后的代码,就是不用自己去实例化(创建连接工厂,连接,信道);让spring容器来控制实例的创建到销毁。
代码的实现有生产者和消费者、还有配置类(创建交换机跟队列及其绑定操作),都独立为一个类(共3个类),yml文件中配置rabbitmq的一些属性。
Direct类型(默认,匹配发送)
它会把消息路由到那些binding key与routing key完全匹配的Queue中。
它是一个一对一的模型,一条消息一定会被发到指定的一个队列(完全匹配)。
配置代码
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitDirectConfig {
@Bean
public Queue directQueue(){
//参数介绍
//1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("directQueue-One",false,false,false,null);
}
@Bean
public Queue directQueue2(){
//参数介绍
//1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("directQueue-Two",false,false,false,null);
}
@Bean
public DirectExchange directExchange(){
//参数介绍
//1.交换器名 2.是否持久化 3.自动删除 4.其他参数
return new DirectExchange("MqSendService-One",false,false,null);
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(directQueue()) //绑定队列
.to(directExchange()) //队列绑定到哪个交换器
.with("One"); //绑定路由key,必须指定
}
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(directQueue2()) //绑定队列
.to(directExchange()) //队列绑定到哪个交换器
.with("Two"); //绑定路由key,必须指定
}
}
Topic类型(拓展匹配发送)
它是Direct类型的一种扩展,提供灵活的匹配规则。
- routing key为一个句点号 " . " 分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如"One.Two"
- binding key与routing key一样也是句点号 " . " 分隔的字符串
- binding key中可以存在两种特殊字符 " * " 与 " # " ,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTopicConfig {
@Bean
public Queue topicQueue(){
//参数介绍
//1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("topicQueue-One",false,false,false,null);
}
@Bean
public Queue topicQueue2(){
//参数介绍
//1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("topicQueue-Two",false,false,false,null);
}
@Bean
public TopicExchange topicExchange(){
//参数介绍
//1.交换器名 2.是否持久化 3.自动删除 4.其他参数
return new TopicExchange("Topic-Ex",false,false,null);
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(topicQueue()) //绑定队列
.to(topicExchange()) //队列绑定到哪个交换器
.with("*.Two.*"); //路由key,必须指定
}
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(topicQueue2()) //绑定队列
.to(topicExchange()) //队列绑定到哪个交换器
.with("#"); //路由key,必须指定
}
}
Fanout 类型(广播发送)
它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
它是一种一对多的类型,无法指定Binding Key,发送的一条消息会被发到绑定的所有队列。
配置代码
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitFanoutConfig {
@Bean
public Queue fanoutQueue(){
//参数介绍
//1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("fanoutQueue-One",false,false,false,null);
}
@Bean
public Queue fanoutQueue2(){
//参数介绍
//1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("fanoutQueue-Two",false,false,false,null);
}
@Bean
public FanoutExchange fanoutExchange(){
//参数介绍
//1.交换器名 2.是否持久化 3.自动删除 4.其他参数
return new FanoutExchange("Fanout-Ex",false,false,null);
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(fanoutQueue()) //绑定队列
.to(fanoutExchange()); //队列绑定到哪个交换器
}
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(fanoutQueue()) //绑定队列
.to(fanoutExchange()); //队列绑定到哪个交换器
}
}
Headers(键值对匹配,不常用)
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到ExchangeRabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
该类型不常用,暂不提供代码。
Message(消息)
当执行诸如 basicPublish() 之类的操作时,内容作为字节数组参数传递,而其他属性作为单独的参数传入。
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
...
}
MessageProperties 接口定义了几个常见的属性,例如“messageId”“timestamp”、“contentType”等等。 还可以通过调用 setHeader(String key, Object value) 方法扩展这些属性
二. 消息的可靠性投递
RabbitMQ消息投递的路径为:
生产者 —> 交换机 —> 队列 —> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
- 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
- 退回模式(return)可以监听消息是否从交换机成功传递到队列。
- 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。
三种模式刚好监听完RabbitMQ的一整套流程。即我们能够由这三种模式得到消息的传递及处理的结果。
确认模式(confirm)
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机
生产者配置文件开启确认模式
rabbitmq:
host: 192.168.66.100
port: 5672
username: jjy
password: jjy
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
package com.jjy.rabbitproducer;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
private final String EXCHNAGE_NAME="my_topic_exchange";
private final String QUEUE_NAME="my_queue";
@Bean("bootExchange")
public Exchange getExchange(){
return ExchangeBuilder
.topicExchange(EXCHNAGE_NAME)//交换机类型
.durable(true)
.build();
}
// 2.创建队列
@Bean("bootQueue")
public Queue getMessageQueue(){
return QueueBuilder
.durable(QUEUE_NAME) // 队列持久化
.build();
}
@Bean
public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
@SpringBootTest
public class ProduceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
// 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 被调用的回调方法
* @param correlationData 相关配置信息
* @param ack 交换机是否成功收到了消息
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("confirm接受成功!");
}else{
System.out.println("confirm接受失败,原因为:"+cause);
// 做一些处理。
}
}
});
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
}
}
退回模式(return)
退回模式(return)可以监听消息是否从交换机成功传递到队列,
使用方法如下:
生产者配置文件开启退回模式
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: jjy
password: jjy
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
# 开启回退模式
publisher-returns: true
package com.jjy.rabbitproducer;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import
上一篇:
阿里邮箱app