欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

RabbitMQ

最编程 2024-02-23 11:19:44
...

RabbitMQ

1.什么是中间件

中间件可以理解为一个帮助不同软件、应用或系统之间交流和数据传输的工具或服务。就像一个翻译员在两个讲不同语言的人之间传递信息,让他们能够互相理解和沟通。中间件位于客户端(比如你的电脑或手机应用)和服务器(存放数据和运行服务的强大计算机)之间,确保数据顺利传输,同时还可以提供额外的功能,比如安全性、数据管理和消息服务等。
简单来说,中间件是一种软件服务,用于连接不同的应用程序和系统,帮助它们更好地工作和交流,无论它们是在同一个地方还是分布在全球不同的位置。这就像是建立在不同软件组件之间的桥梁,确保信息能够顺畅、安全地流动。

2.为什么需要使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

3.中间件特性

1.互操作性和兼容性: 中间件使得不同平台、不同技术和不同语言编写的应用程序能够相互交流和协作。它提供了一种标准的方法来促进不同系统之间的通信,无论这些系统的底层技术如何不同。
2.抽象层: 中间件提供了一个抽象层,隐藏了底层网络和数据通信的复杂性,使得开发者可以专注于应用逻辑的开发,而不需要深入了解底层技术的细节。
3.可扩展性: 中间件设计时考虑到了系统的可扩展性,可以支持从几个到成千上万个并发用户和服务。这使得应用程序可以根据需要轻松扩展,以适应不断增长的用户需求。
4.安全性: 中间件还提供了安全机制,包括认证、授权、加密和数据完整性验证等,以保证数据传输的安全和防止未经授权的访问。
5.高可用性和可靠性: 通过提供故障转移、负载均衡和事务管理等功能,中间件确保了应用程序的高可用性和可靠性。即使在部分系统组件失败的情况下,也能保证服务的连续性和数据的完整性。
6.异步通信和消息队列: 中间件支持异步通信模式,允许应用程序在不直接等待响应的情况下发送和接收消息。这通过使用消息队列等技术实现,提高了应用程序的性能和响应速度。

4.什么是消息中间件

消息中间件是一种特定类型的中间件,专注于消息的传递和交换。它允许不同的应用程序、系统或软件组件之间通过消息来进行通信,而不是直接调用对方的接口或方法。这种通信方式可以是异步的,也就是说,发送消息的一方不需要等待接收方立即处理消息和回应。消息中间件通常提供了一系列功能,以支持复杂的消息处理模式,包括但不限于消息队列、发布/订阅模型、消息路由和消息持久化。

4.1基于消息中间件的分布式系统的架构

在这里插入图片描述

4.2消息中间件应用的场景

1.系统解耦:在复杂的应用架构中,各个系统或服务之间的直接依赖会导致维护和扩展变得困难。消息中间件通过提供一个中介层,允许服务之间通过消息进行通信,从而减少了它们之间的直接耦合。这样,即使某个服务发生变化,也不会直接影响到其他服务。
2.异步处理:在处理耗时操作(如发送电子邮件、生成报告等)时,可以将这些任务异步化,即发送一个消息到消息队列,然后继续处理其他任务。后台服务可以从队列中取出消息并处理这些耗时操作,这样可以提高应用的响应速度和用户体验。
3.负载均衡:通过消息中间件,可以将任务分散到多个处理单元上执行,从而实现负载均衡。这对于处理大量并发请求或大数据处理尤为重要,可以有效分配系统资源,避免某个服务因过载而崩溃。
4.数据同步:在分布式系统中,确保数据在不同的服务或数据库之间保持一致性是一项挑战。通过消息中间件,可以实现数据的变更通知和同步,当一个服务更新了数据后,可以通过消息通知其他服务进行相应的更新。
5.事件驱动架构:在事件驱动架构中,系统的行为是由事件触发的。消息中间件允许服务发布事件(消息)到一个共享的事件通道,其他服务可以订阅这些事件并作出响应。这种模式支持高度模块化和动态的系统行为,使得应用能够灵活地响应各种事件。
6.微服务架构:在微服务架构中,应用被分解成许多小型、独立的服务,它们通过网络进行通信。消息中间件是实现服务间通信的理想选择,因为它支持异步消息传递、服务解耦和弹性扩展。
7.大数据处理和实时流处理:在大数据和实时数据流处理场景中,消息中间件(如Apache Kafka)能够处理高吞吐量的数据流。它允许数据被实时捕获、存储、处理和分析,支持复杂的数据处理管道和实时分析应用。

4.3消息中间件的本质及设计

它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。
在这里插入图片描述

4.4核心组成部分

1.消息队列:消息队列是消息中间件的基本组成部分,它暂时存储在发送者和接收者之间传递的消息。队列确保消息能够按照发送的顺序被逐一处理,即使接收者暂时无法处理消息,也能保证消息不会丢失。
2.交换器(Exchange):在一些消息中间件系统中,特别是使用发布/订阅模式的系统里,交换器负责接收生产者发送的消息,并根据路由规则决定消息应该发送到哪个队列。交换器使得消息分发更加灵活和强大。
3.消息通道:消息通道是连接应用程序和消息中间件的通信路径。它可以是点对点的,也可以是发布/订阅模式下的多对多通信。消息通道抽象了底层的网络通信细节,为消息的发送和接收提供了一个简单的接口。
4.消息代理(Broker):消息代理是消息中间件的中心组件,它管理着消息队列、处理消息的路由、传递和可能的持久化。消息代理还负责处理消息的接收、存储和转发给目标接收者。
5.客户端库:客户端库提供了一套API,使得应用程序能够与消息中间件系统交互,包括发送和接收消息。这些库通常是针对特定编程语言设计的,简化了消息中间件的集成和使用。
6.管理和监控工具:为了确保消息中间件的稳定运行和高效性能,管理和监控工具是必不可少的。这些工具可以监控消息流量、队列长度、处理延迟等指标,并提供配置管理、故障诊断和性能调优的功能。
7.持久化存储:为了保证消息不会因为系统故障而丢失,许多消息中间件支持将消息持久化到磁盘或其他存储系统。这确保了即使在发生故障的情况下,消息也能够被安全地恢复。

4.5 RabbitMQ使用的协议

RabbitMQ 主要使用的协议是 AMQP(高级消息队列协议,Advanced Message Queuing Protocol)。AMQP 是一个应用层协议,专为异步消息队列提供标准化的通信协议,旨在确保兼容性和可靠性,支持多种消息中间件产品之间的互操作性。
AMQP 定义了消息传递的各种方面,包括消息的排队、路由(包括点对点和发布/订阅模式)、安全性、事务和其他消息服务。RabbitMQ 作为一个遵循 AMQP 标准的消息队列系统,充分利用了这些特性,提供了一个高效、可靠、可扩展的消息中间件解决方案。

除了 AMQP,RabbitMQ 还支持其他的协议,例如:
MQTT(消息队列遥测传输):一个轻量级的消息协议,适用于小型设备和移动应用,常用于物联网(IoT)场景。
STOMP(简单文本导向的消息协议):一种简单的互操作协议,设计用于异步消息传递,支持多种语言和环境。
HTTP和WebSockets:通过插件支持,RabbitMQ 可以接受HTTP和WebSockets协议的消息,使其能够更容易地集成到Web应用中。
这种多协议支持使得 RabbitMQ 能够在多种不同的应用场景中使用,从企业级应用到物联网和实时Web应用,都能提供稳定可靠的消息传递服务。

4.5消息队列持久化

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
常见的持久化方式:文件存储和数据库存储
在这里插入图片描述

4.6分发策略

  1. 轮询分发(Round-Robin Dispatching)
    描述:这是最基本的分发策略,消息按顺序轮流分发给每个消费者。当有多个消费者从同一个队列中获取消息时,RabbitMQ 会按照消费者的顺序依次将消息分发给它们,每个消费者轮流接收一个消息。
    应用场景:适用于处理相似工作量的消息时,可以保证工作负载在不同消费者之间大致平均分配。
  2. 公平分发(Fair Dispatching)
    描述:为了避免某些消费者因处理较慢的任务而积压大量消息,RabbitMQ 允许通过设置预取值(prefetch count)来控制在消费者发送 ack(确认)之前,队列最多发送给消费者的消息数量。
    应用场景:适用于处理工作量不均匀的任务,确保没有单个消费者被过度负载。
  3. 优先级队列
    描述:RabbitMQ 支持优先级队列,允许为消息设置优先级。队列会根据消息的优先级决定分发顺序,高优先级的消息会先于低优先级的消息被消费。
    应用场景:适用于那些需要紧急处理的消息,确保关键任务能够得到优先执行。
  4. 基于发布/订阅的分发
    描述:在发布/订阅模型中,消息被发送到交换器(exchange),然后根据绑定(binding)规则分发到一个或多个队列。消费者从各自订阅的队列中接收消息。
    应用场景:适用于需要将消息广播到多个消费者的场景,如日志收集、消息通知等。
  5. 路由分发
    描述:使用具有选择性路由能力的交换器(如直连交换器direct exchange,主题交换器topic exchange),可以基于消息的路由键(routing key)将消息精确地分发到一个或多个队列。
    应用场景:适用于复杂的条件下的消息分发,如基于特定规则或属性将消息路由到不同的消费者。
    这些分发策略提供了灵活的方式来控制消息如何在生产者和消费者之间流动,使得RabbitMQ能够适应各种不同的消息传递需求和应用场景。选择合适的分发策略可以显著提高应用的性能和响应能力。

4.7消息队列高可用和高可靠

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。

4.8集群模式

  1. Master-Slave主从共享数据的部署方式
    解释:在这种模式下,主节点(Master)和从节点(Slave)共享相同的数据存储。主节点处理写操作,而从节点可以处理读操作。如果主节点出现故障,可以手动或自动切换到一个从节点作为新的主节点,以此来保证服务的可用性。
    例子:传统的数据库系统,如MySQL的复制功能,可以配置成主从共享同一个物理存储,或者通过网络文件系统(NFS)共享数据。
  2. Master-Slave主从同步部署方式
    解释:在主从同步模式中,数据从主节点同步到一个或多个从节点。主节点负责处理所有写操作,同时,写操作的数据会被同步到从节点,确保数据的一致性。如果主节点失败,从节点可以被提升为新的主节点。
    例子:PostgreSQL的流复制(Streaming Replication)允许从节点实时地接收主节点的更新,实现数据的同步。
  3. 多主集群同步部署模式
    解释:这种模式允许多个主节点同时接收和处理写操作。所有的主节点之间会相互同步数据,以保持数据的一致性。这种模式可以提高系统的写入吞吐量和可用性。
    例子:Galera Cluster为MySQL提供了多主节点的集群支持,其中每个节点都可以处理写操作,并且写操作会被同步到其他节点。
  4. 多主集群转发部署模式
    解释:在这个模式下,多个主节点可以接收写操作,但写操作会被转发到一个指定的节点进行实际的数据修改。这个指定的节点随后会将数据更改同步到其他节点。这种方式可以简化数据同步的逻辑,但增加了写操作的延迟。
    例子:某些自定义的数据库集群可能采用这种模式,以便集中管理数据的更新,然后通过自定义的同步机制将更新推送到其他节点。
  5. Master-Slave与Broker-Cluster组合的方案
    解释:这种方案结合了Master-Slave模式的数据同步特性和Broker-Cluster的消息队列功能。数据的写操作首先在Master上进行,然后通过消息队列(Broker-Cluster)分发到Slave节点进行数据同步。这种方式既保证了数据的实时同步,又提高了系统的可扩展性和容错能力。
    例子:可以将RabbitMQ用作Broker-Cluster,结合数据库的Master-Slave复制,实现高效的数据同步和分发。例如,一个Web应用的数据库更新操作在主数据库(Master)上执行,然后通过RabbitMQ将更新事件异步分发给其他从数据库(Slave),实现数据的快速同步。
    举例:传统的数据库系统和某些消息队列系统经常使用主从模式来提高数据的可用性和读取性能。
  6. 对等(Peer-to-Peer)模式
    描述:在这个模式下,每个节点都具有相同的角色和责任,节点之间相互协作,没有明确的主从关系。这种模式可以提高系统的可扩展性和容错能力,因为每个节点都可以处理消息,并且节点之间可以共享状态信息。
    举例:某些分布式文件系统和P2P网络使用对等模式来提高系统的可扩展性和数据的可用性。
  7. 分片(Sharding)模式
    描述:通过将数据分散存储到多个节点上,每个节点只负责处理一部分数据或消息。这种方式可以提高系统的吞吐量和可扩展性,因为它允许并行处理大量的消息。
    举例:RabbitMQ和其他一些高性能消息队列系统支持分片插件或配置,以此来分散负载并提高处理能力。
  8. 镜像队列(Mirrored Queues)模式
    描述:在RabbitMQ中,镜像队列是一种特殊的集群模式,用于在多个节点上复制队列的完整状态,以实现高可用性。如果一个节点失败,其他节点上的镜像队列可以继续处理消息,无需手动干预。
    举例:RabbitMQ的镜像队列功能允许在不同服务器上创建队列的精确副本,这样即使某个服务器发生故障,消息也不会丢失,系统可以继续运行。
  9. 自动容错(Autonomous Failover)模式
    描述:这种模式下,集群能够自动检测节点故障,并将故障节点上的工作负载自动迁移到健康节点上,以此来保证服务的连续性和数据的完整性。
    举例:Apache Kafka通过使用Zookeeper来管理集群状态,实现了自动的故障转移和领导者选举机制,从而提高了整个系统的可靠性和可用性。

5.RabbitMQ使用

5.1一些基本配置

1.默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效

rabbitmq-plugins enable rabbitmq_management

安装完毕以后,重启服务即可

systemctl restart rabbitmq-server

在浏览器访问
http://localhost:15672
账号密码都是guest

新增用户

rabbitmqctl add_user admin admin

设置用户分配操作权限

rabbitmqctl set_user_tags admin administrator

用户级别:

1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
2、monitoring 监控者 登录控制台,查看所有信息
3、policymaker 策略制定者 登录控制台,指定策略
4、managment 普通管理员 登录控制台

其他配置

rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Newpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

6.RabbitMQ角色分类

1.none:
不能访问management plugin,也就是不能显示如下界面
在这里插入图片描述
2.management:查看自己相关节点信息
列出自己可以通过AMQP登入的虚拟机
查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
查看和关闭自己的channels和connections
查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3.Policymaker
包含management所有权限
查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4.Monitoring
包含management所有权限
罗列出所有的virtual hosts,包括不能登录的virtual hosts。
查看其他用户的connections和channels信息
查看节点级别的数据如clustering和memory使用情况
查看所有的virtual hosts的全局统计信息。
5.Administrator
最高权限
可以创建和删除virtual hosts
可以查看,创建和删除users
查看创建permisssions
关闭所有用户的connections

7.什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
AMQP生产者流转过程
在这里插入图片描述
AMQP消费者流转过程
在这里插入图片描述

8.RabbitMQ的核心组成部分

在这里插入图片描述
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

9.RabbitMQ的运行流程

在这里插入图片描述

11.RabbitMQ支持消息的模式

RabbitMQ 是一种消息代理,它支持不同的消息传递模式,这些模式定义了消息如何从发送者传递到接收者。下面是一些通俗易懂的解释和例子:

  1. 简单模式(Simple):

    • 描述:一对一的消息发送。一个生产者发送消息到队列,一个消费者从队列接收消息。
    • 例子:一个用户界面应用(生产者)发送用户请求到后端服务器(消费者)处理。
  2. 工作队列模式(Work Queues):

    • 描述:将任务分配给多个工作者(消费者)。一位生产者发送任务,多位消费者平分这些任务并独立工作。
    • 例子:一个应用发送大量的图像处理请求到一个队列,多个图像处理服务(消费者)从队列获取并处理这些图像。
  3. 发布/订阅模式(也叫Fanout模式)(Publish/Subscribe):

    • 描述:消息一对多分发。一个生产者发送消息到交换机,然后被转发到多个队列,每个队列有一个消费者。
    • 例子:一个新闻发布系统将新闻推送到不同的频道,订阅了相应频道的用户都能接收到新闻。
  4. 路由模式(也叫direct模式)(Routing):

    • 描述:根据一定的规则(路由键)将消息路由到一个或多个队列。生产者将消息发送到交换机,并指定一个路由键,交换机根据路由键决定消息的去向。
    • 例子:日志系统根据日志级别(如“error”、“info”)发送消息到不同的处理队列。
  5. 主题模式(Topics):

    • 描述:消息根据模式匹配(而不是完全相等,也就是模糊匹配)路由到一个或多个队列。主题交换机可以根据通配符进行模式匹配,路由到不同队列。
    • 例子:股市应用,将股票更新发送到交换机,交换机根据股票代码模式(如stock.us.*)转发到对应的队列。
  6. 参数模式(Headers):

    • 描述:而不是根据路由键,消费者从交换机接收消息是基于发送消息的头信息(参数)。
    • 例子:一个系统根据消息头信息中的“x-device-type”和“x-user-language”参数,决定将推送的广告发送到哪个用户的队列。

RabbitMQ 通过这些模式提供了灵活的消息路由选项,以适应不同的消息传递场景和需求。

10.简单代码案例

10.1simple

生产者

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare("queue1", false, false, false, null);
            // 6: 准备发送消息的内容
            String message = "你好,啊啊啊!!!";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routing
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");
            // 4: 从连接中获取通道channel
            channel=connection.createChannel();
            channel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到消息是" + new String(delivery.getBody(),"utf-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("消息接收失败");
                }
            });
            System.out.println("开始接收消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

10.2Fanout模式

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("localhost7");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = "你好!!!";
            String  exchangeName = "fanout-exchange";
            String routingKey = "";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者


import java.io.IOException;

public class Consumer {
    private static Runnable runnable = () -> {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //获取队列的名称
        final String queueName = Thread.currentThread().getName();
        Connection connection = null<