欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

SpringBoot 2.x 整合 RabbitMQ 实战教程 - 生产者部分

最编程 2024-08-02 07:02:39
...
package com.gblfy.springboot.producer;

import com.gblfy.springboot.entity.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitMQSender {

//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* MQ发送 字符串类型消息+额外的属性
*
* @param message
* @param properties
* @throws Exception
*/
//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
//构造一个添加额外属性的容器 储存额外消息
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);

//自动签收
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息确认
rabbitTemplate.setReturnCallback(returnCallback);

//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}

/**
* 发送MQ 对象类型消息
*
* @param order
* @throws Exception
*/
//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("0987654321");
rabbitTemplate.convertAndSend("exchange-2", "springboot.ff", order, correlationData);
}

//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if (!ack) {
System.err.println("异常处理....");
}
}
};
//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
}