欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Spring Boot与RabbitMQ的完美融合:详细指南

最编程 2024-01-23 08:08:49
...

 

本文对应的代码地址:https://github.com/zhangshilin9527/rabbitmq-study

前置工作:

1.安装rabbitmq

2.登录

地址: http://localhost:15672/

账号密码: guest/guest

3.创建角色

 

 

 

 

4.1 创建virtual Hosts

 

4.2为virtual Hosts赋权

5.增加Exchanges(交换机)

 

 

 

6.增加Queue(队列)

7.Exchange绑定queue

Springboot集成RabbitMQ

Springboot集成RabbitMQ,首先创建一个Springboot项目,可以通过官网的脚手架生成一个springboot项目。Springboot集成RabbitMQ的官网地址为:https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-rabbitmq,下面是我参考文档集成的步骤:

 

生产者代码

1.加入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.加入yaml配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: xiaolinzi
    username: xiaolinzi
    password: xiaolinzi
    #超时时间
    connection-timeout: 10000s
    #开启消息确认模式
    publisher-confirms: true
    #开启消息送达提示
    publisher-returns: true
    #开启不可达消息不会被broker给删除
    template:
      mandatory: true

3.配置mq

/**
 * 交换机
 * @return
 */
@Bean
public DirectExchange xiaolinziDirectExchange() {
    //durable 表示小时是否持久化
    //autoDelete 消息是否自动删除
    DirectExchange directExchange = new DirectExchange(DIRECT_EXCHANGE, true, false);
    return directExchange;
}

/**
 * 队列
 * @return
 */
@Bean
public Queue xiaolinziQueue() {
    //exclusive:是否排外的  一般等于true的话用于一个队列只能有一个消费者来消费的场景
    Queue queue = new Queue(DIRECT_QUEUE, true, false, false);
    return queue;
}

/**
 * 绑定关系
 * @return
 */
@Bean
public Binding xiaolinziBinder() {
    return BindingBuilder.bind(xiaolinziQueue()).to(xiaolinziDirectExchange()).with(DIRECT_QUEUE_KEY);
}

4.发送消息

public void sendMsg() {
    SendInfo sendInfo = bulidSendInfo();

    //构建correlationData 唯一标识,可以使用id做特殊处理
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //开启确认模式
    rabbitTemplate.setConfirmCallback(new XiaolinziConfirmCallBack());

    //开启消息投递监听
    rabbitTemplate.setReturnCallback(new XiaolinziRetrunCallBack());

    rabbitTemplate.convertAndSend("xiaolinzi_direct", "xiaolinzi_direct_queue_key", JSONObject.toJSONString(sendInfo), correlationData);
}

 

消费者代码

1.加入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.加入yaml配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: xiaolinzi
    username: xiaolinzi
    password: xiaolinzi
    #超时时间
    connection-timeout: 10000s
    #开启消息确认模式
    publisher-confirms: true
    #开启消息送达提示
    publisher-returns: true
    #开启不可达消息不会被broker给删除
    template:
      mandatory: true

3.接受消息

@RabbitListener(queues = {"xiaolinzi_direct_queue"})
@RabbitHandler
public void consumerMsg(Message message, Channel channel) throws IOException {

    logger.info("消费消息:{}", message);
    //手工签收
    Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    logger.info("接受deliveryTag:{}", deliveryTag);
    channel.basicAck(deliveryTag, false);
}

 

 

测试

发送消息:

 

看一下交换机上,现在有一条消息

 

接收消息

 

再看一下交换机上,已经没有了消息

 

 

 

 

 

 

推荐阅读